diff options
Diffstat (limited to 'sql')
85 files changed, 3245 insertions, 487 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 6f162f4d84d..18508468f60 100755 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -75,6 +75,7 @@ SET (SQL_SOURCE rpl_rli.cc rpl_mi.cc sql_servers.cc sql_connect.cc scheduler.cc sql_profile.cc event_parse_data.cc + rpl_handler.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h ${PROJECT_SOURCE_DIR}/include/mysqld_error.h diff --git a/sql/Makefile.am b/sql/Makefile.am index 2a12de2eaf6..afce7db59f2 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -76,7 +76,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ sql_plugin.h authors.h event_parse_data.h \ event_data_objects.h event_scheduler.h \ sql_partition.h partition_info.h partition_element.h \ - contributors.h sql_servers.h + contributors.h sql_servers.h \ + rpl_handler.h replication.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \ @@ -120,7 +121,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ event_queue.cc event_db_repository.cc events.cc \ sql_plugin.cc sql_binlog.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc \ - sql_servers.cc event_parse_data.cc + sql_servers.cc event_parse_data.cc \ + rpl_handler.cc nodist_mysqld_SOURCES = mini_client_errors.c pack.c client.c my_time.c my_user.c diff --git a/sql/authors.h b/sql/authors.h index dfe3b143e2f..90cdae9c0a0 100644 --- a/sql/authors.h +++ b/sql/authors.h @@ -1,3 +1,6 @@ +#ifndef AUTHORS_INCLUDED +#define AUTHORS_INCLUDED + /* Copyright (C) 2005-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -150,3 +153,5 @@ struct show_table_authors_st show_table_authors[]= { "SHA1(), AES_ENCRYPT(), AES_DECRYPT(), bug fixing" }, {NULL, NULL, NULL} }; + +#endif /* AUTHORS_INCLUDED */ diff --git a/sql/client_settings.h b/sql/client_settings.h index 4f06c15a29e..fd50bfdbb88 100644 --- a/sql/client_settings.h +++ b/sql/client_settings.h @@ -14,6 +14,12 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifndef CLIENT_SETTINGS_INCLUDED +#define CLIENT_SETTINGS_INCLUDED +#else +#error You have already included an client_settings.h and it should not be included twice +#endif /* CLIENT_SETTINGS_INCLUDED */ + #include <thr_alarm.h> #define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | \ diff --git a/sql/contributors.h b/sql/contributors.h index 87001e29d88..6cf8bb88e3b 100644 --- a/sql/contributors.h +++ b/sql/contributors.h @@ -1,3 +1,6 @@ +#ifndef CONTRIBUTORS_INCLUDED +#define CONTRIBUTORS_INCLUDED + /* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -37,3 +40,5 @@ struct show_table_contributors_st show_table_contributors[]= { {"Mark Shuttleworth", "London, UK.", "EFF contribution for UC2006 Auction"}, {NULL, NULL, NULL} }; + +#endif /* CONTRIBUTORS_INCLUDED */ diff --git a/sql/field.h b/sql/field.h index a9299256f88..36569428ee0 100644 --- a/sql/field.h +++ b/sql/field.h @@ -1,3 +1,6 @@ +#ifndef FIELD_INCLUDED +#define FIELD_INCLUDED + /* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify @@ -2180,3 +2183,5 @@ int set_field_to_null_with_conversions(Field *field, bool no_conversions); #define f_no_default(x) (x & FIELDFLAG_NO_DEFAULT) #define f_bit_as_char(x) ((x) & FIELDFLAG_TREAT_BIT_AS_CHAR) #define f_is_hex_escape(x) ((x) & FIELDFLAG_HEX_ESCAPE) + +#endif /* FIELD_INCLUDED */ diff --git a/sql/gstream.h b/sql/gstream.h index 1ef90ad5bf0..ea7158ee1a3 100644 --- a/sql/gstream.h +++ b/sql/gstream.h @@ -1,3 +1,6 @@ +#ifndef GSTREAM_INCLUDED +#define GSTREAM_INCLUDED + /* Copyright (C) 2000-2004 MySQL AB This program is free software; you can redistribute it and/or modify @@ -73,3 +76,5 @@ protected: char *m_err_msg; CHARSET_INFO *m_charset; }; + +#endif /* GSTREAM_INCLUDED */ diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index a17323d3fd6..662655c42b9 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -1,3 +1,6 @@ +#ifndef HA_NDBCLUSTER_INCLUDED +#define HA_NDBCLUSTER_INCLUDED + /* Copyright (C) 2000-2003 MySQL AB This program is free software; you can redistribute it and/or modify @@ -581,3 +584,5 @@ static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1; extern int ndbcluster_terminating; extern int ndb_util_thread_running; extern pthread_cond_t COND_ndb_util_ready; + +#endif /* HA_NDBCLUSTER_INCLUDED */ diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h index 1cad643e5ec..d80dfe9ee74 100644 --- a/sql/ha_ndbcluster_binlog.h +++ b/sql/ha_ndbcluster_binlog.h @@ -1,3 +1,6 @@ +#ifndef HA_NDBCLUSTER_BINLOG_INCLUDED +#define HA_NDBCLUSTER_BINLOG_INCLUDED + /* Copyright (C) 2000-2003 MySQL AB This program is free software; you can redistribute it and/or modify @@ -225,3 +228,5 @@ set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd_set_ha_data(thd, ndbcluster_hton, thd_ndb); } Ndb* check_ndb_in_thd(THD* thd); + +#endif /* HA_NDBCLUSTER_BINLOG_INCLUDED */ diff --git a/sql/ha_ndbcluster_cond.h b/sql/ha_ndbcluster_cond.h index 4401a93c9e1..4ccc7e062ec 100644 --- a/sql/ha_ndbcluster_cond.h +++ b/sql/ha_ndbcluster_cond.h @@ -1,3 +1,6 @@ +#ifndef HA_NDBCLUSTER_COND_INCLUDED +#define HA_NDBCLUSTER_COND_INCLUDED + /* Copyright (C) 2000-2007 MySQL AB This program is free software; you can redistribute it and/or modify @@ -486,3 +489,5 @@ private: Ndb_cond_stack *m_cond_stack; }; + +#endif /* HA_NDBCLUSTER_COND_INCLUDED */ diff --git a/sql/ha_ndbcluster_tables.h b/sql/ha_ndbcluster_tables.h index c6bc8f577f8..ba2e8ec251b 100644 --- a/sql/ha_ndbcluster_tables.h +++ b/sql/ha_ndbcluster_tables.h @@ -1,3 +1,6 @@ +#ifndef HA_NDBCLUSTER_TABLES_INCLUDED +#define HA_NDBCLUSTER_TABLES_INCLUDED + /* Copyright (C) 2000-2003 MySQL AB This program is free software; you can redistribute it and/or modify @@ -21,3 +24,5 @@ #define OLD_NDB_APPLY_TABLE "apply_status" #define NDB_SCHEMA_TABLE "ndb_schema" #define OLD_NDB_SCHEMA_TABLE "schema" + +#endif /* HA_NDBCLUSTER_TABLES_INCLUDED */ diff --git a/sql/ha_partition.h b/sql/ha_partition.h index 8a81a759e2a..804db028953 100644 --- a/sql/ha_partition.h +++ b/sql/ha_partition.h @@ -1,3 +1,6 @@ +#ifndef HA_PARTITION_INCLUDED +#define HA_PARTITION_INCLUDED + /* Copyright 2005-2008 MySQL AB, 2008 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify @@ -1090,3 +1093,5 @@ public: virtual void append_create_info(String *packet) */ }; + +#endif /* HA_PARTITION_INCLUDED */ diff --git a/sql/handler.cc b/sql/handler.cc index b44c799f499..e3d22fe1bec 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -24,6 +24,7 @@ #endif #include "mysql_priv.h" +#include "rpl_handler.h" #include "rpl_filter.h" #include <myisampack.h> #include <errno.h> @@ -221,6 +222,8 @@ handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type, return NULL; } + RUN_HOOK(transaction, after_rollback, (thd, FALSE)); + switch (database_type) { #ifndef NO_HASH case DB_TYPE_HASH: @@ -413,7 +416,13 @@ int ha_finalize_handlerton(st_plugin_int *plugin) reuse an array slot. Otherwise the number of uninstall/install cycles would be limited. */ - hton2plugin[hton->slot]= NULL; + if (hton->slot != HA_SLOT_UNDEF) + { + /* Make sure we are not unpluging another plugin */ + DBUG_ASSERT(hton2plugin[hton->slot] == plugin); + DBUG_ASSERT(hton->slot < MAX_HA); + hton2plugin[hton->slot]= NULL; + } my_free((uchar*)hton, MYF(0)); @@ -430,6 +439,15 @@ int ha_initialize_handlerton(st_plugin_int *plugin) hton= (handlerton *)my_malloc(sizeof(handlerton), MYF(MY_WME | MY_ZEROFILL)); + + if (hton == NULL) + { + sql_print_error("Unable to allocate memory for plugin '%s' handlerton.", + plugin->name.str); + goto err_no_hton_memory; + } + + hton->slot= HA_SLOT_UNDEF; /* Historical Requirement */ plugin->data= hton; // shortcut for the future if (plugin->plugin->init && plugin->plugin->init(hton)) @@ -540,6 +558,7 @@ err_deinit: err: my_free((uchar*) hton, MYF(0)); +err_no_hton_memory: plugin->data= NULL; DBUG_RETURN(1); } @@ -1190,6 +1209,7 @@ int ha_commit_trans(THD *thd, bool all) if (cookie) tc_log->unlog(cookie, xid); DBUG_EXECUTE_IF("crash_commit_after", abort();); + RUN_HOOK(transaction, after_commit, (thd, FALSE)); end: if (rw_trans) start_waiting_global_read_lock(thd); @@ -1337,6 +1357,7 @@ int ha_rollback_trans(THD *thd, bool all) push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_WARNING_NOT_COMPLETE_ROLLBACK, ER(ER_WARNING_NOT_COMPLETE_ROLLBACK)); + RUN_HOOK(transaction, after_rollback, (thd, FALSE)); DBUG_RETURN(error); } @@ -1371,7 +1392,14 @@ int ha_autocommit_or_rollback(THD *thd, int error) thd->variables.tx_isolation=thd->session_tx_isolation; } + else #endif + { + if (!error) + RUN_HOOK(transaction, after_commit, (thd, FALSE)); + else + RUN_HOOK(transaction, after_rollback, (thd, FALSE)); + } DBUG_RETURN(error); } diff --git a/sql/handler.h b/sql/handler.h index f759239d66e..ea0b134e53d 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1,3 +1,6 @@ +#ifndef HANDLER_INCLUDED +#define HANDLER_INCLUDED + /* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify @@ -214,6 +217,13 @@ #define MAX_HA 15 /* + Use this instead of 0 as the initial value for the slot number of + handlerton, so that we can distinguish uninitialized slot number + from slot 0. +*/ +#define HA_SLOT_UNDEF ((uint)-1) + +/* Parameters for open() (in register form->filestat) HA_GET_INFO does an implicit HA_ABORT_IF_LOCKED */ @@ -2064,3 +2074,4 @@ int ha_binlog_end(THD *thd); #define ha_binlog_wait(a) do {} while (0) #define ha_binlog_end(a) do {} while (0) #endif +#endif /* HANDLER_INCLUDED */ diff --git a/sql/item.h b/sql/item.h index a2cff3ab3a9..b44e84f4b15 100644 --- a/sql/item.h +++ b/sql/item.h @@ -1,3 +1,6 @@ +#ifndef ITEM_INCLUDED +#define ITEM_INCLUDED + /* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify @@ -3126,3 +3129,5 @@ extern Cached_item *new_Cached_item(THD *thd, Item *item); extern Item_result item_cmp_type(Item_result a,Item_result b); extern void resolve_const_item(THD *thd, Item **ref, Item *cmp_item); extern int stored_field_cmp_to_item(Field *field, Item *item); + +#endif /* ITEM_INCLUDED */ diff --git a/sql/item_cmpfunc.h b/sql/item_cmpfunc.h index c2227fa04e0..3462bed94a2 100644 --- a/sql/item_cmpfunc.h +++ b/sql/item_cmpfunc.h @@ -1,3 +1,6 @@ +#ifndef ITEM_CMPFUNC_INCLUDED +#define ITEM_CMPFUNC_INCLUDED + /* Copyright (C) 2000-2003 MySQL AB This program is free software; you can redistribute it and/or modify @@ -1721,3 +1724,5 @@ inline Item *and_conds(Item *a, Item *b) } Item *and_expressions(Item *a, Item *b, Item **org_item); + +#endif /* ITEM_CMPFUNC_INCLUDED */ diff --git a/sql/item_func.h b/sql/item_func.h index fdbbff89e60..628878bcaed 100644 --- a/sql/item_func.h +++ b/sql/item_func.h @@ -1,3 +1,6 @@ +#ifndef ITEM_FUNC_INCLUDED +#define ITEM_FUNC_INCLUDED + /* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify @@ -1718,3 +1721,4 @@ public: bool check_partition_func_processor(uchar *int_arg) {return FALSE;} }; +#endif /* ITEM_FUNC_INCLUDED */ diff --git a/sql/item_geofunc.h b/sql/item_geofunc.h index edbe104e307..9a55ea7d5b1 100644 --- a/sql/item_geofunc.h +++ b/sql/item_geofunc.h @@ -1,3 +1,6 @@ +#ifndef ITEM_GEOFUNC_INCLUDED +#define ITEM_GEOFUNC_INCLUDED + /* Copyright (C) 2000-2003 MySQL AB This program is free software; you can redistribute it and/or modify @@ -386,3 +389,4 @@ public: #endif +#endif /* ITEM_GEOFUNC_INCLUDED */ diff --git a/sql/item_row.h b/sql/item_row.h index 67441f49603..7c08c5888e0 100644 --- a/sql/item_row.h +++ b/sql/item_row.h @@ -1,3 +1,6 @@ +#ifndef ITEM_ROW_INCLUDED +#define ITEM_ROW_INCLUDED + /* Copyright (C) 2000 MySQL AB This program is free software; you can redistribute it and/or modify @@ -77,3 +80,5 @@ public: bool null_inside() { return with_null; }; void bring_value(); }; + +#endif /* ITEM_ROW_INCLUDED */ diff --git a/sql/item_strfunc.h b/sql/item_strfunc.h index 2cdb45100ae..fc70b04ca68 100644 --- a/sql/item_strfunc.h +++ b/sql/item_strfunc.h @@ -1,3 +1,6 @@ +#ifndef ITEM_STRFUNC_INCLUDED +#define ITEM_STRFUNC_INCLUDED + /* Copyright (C) 2000-2003 MySQL AB This program is free software; you can redistribute it and/or modify @@ -842,3 +845,4 @@ public: String *val_str(String *); }; +#endif /* ITEM_STRFUNC_INCLUDED */ diff --git a/sql/item_subselect.h b/sql/item_subselect.h index d4aa621c083..ea59521aab1 100644 --- a/sql/item_subselect.h +++ b/sql/item_subselect.h @@ -1,3 +1,6 @@ +#ifndef ITEM_SUBSELECT_INCLUDED +#define ITEM_SUBSELECT_INCLUDED + /* Copyright (C) 2000 MySQL AB This program is free software; you can redistribute it and/or modify @@ -579,4 +582,4 @@ inline bool Item_subselect::is_uncacheable() const return engine->uncacheable(); } - +#endif /* ITEM_SUBSELECT_INCLUDED */ diff --git a/sql/item_sum.h b/sql/item_sum.h index d991327d847..993ec1597b4 100644 --- a/sql/item_sum.h +++ b/sql/item_sum.h @@ -1,3 +1,6 @@ +#ifndef ITEM_SUM_INCLUDED +#define ITEM_SUM_INCLUDED + /* Copyright (C) 2000-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -1279,3 +1282,5 @@ public: virtual bool change_context_processor(uchar *cntx) { context= (Name_resolution_context *)cntx; return FALSE; } }; + +#endif /* ITEM_SUM_INCLUDED */ diff --git a/sql/item_timefunc.h b/sql/item_timefunc.h index 9e3c2e8c89f..7f1b2ed3a53 100644 --- a/sql/item_timefunc.h +++ b/sql/item_timefunc.h @@ -1,3 +1,6 @@ +#ifndef ITEM_TIMEFUNC_INCLUDED +#define ITEM_TIMEFUNC_INCLUDED + /* Copyright (C) 2000-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -1026,3 +1029,5 @@ public: const char *func_name() const { return "last_day"; } bool get_date(MYSQL_TIME *res, uint fuzzy_date); }; + +#endif /* ITEM_TIMEFUNC_INCLUDED */ diff --git a/sql/item_xmlfunc.h b/sql/item_xmlfunc.h index dadbb5ccf42..6373bde0aab 100644 --- a/sql/item_xmlfunc.h +++ b/sql/item_xmlfunc.h @@ -1,3 +1,6 @@ +#ifndef ITEM_XMLFUNC_INCLUDED +#define ITEM_XMLFUNC_INCLUDED + /* Copyright (C) 2000-2005 MySQL AB This program is free software; you can redistribute it and/or modify @@ -61,3 +64,4 @@ public: String *val_str(String *); }; +#endif /* ITEM_XMLFUNC_INCLUDED */ diff --git a/sql/lex.h b/sql/lex.h index 0a85824f6f7..5cdb506452e 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -1,3 +1,6 @@ +#ifndef LEX_INCLUDED +#define LEX_INCLUDED + /* Copyright (C) 2000-2002 MySQL AB This program is free software; you can redistribute it and/or modify @@ -241,6 +244,7 @@ static SYMBOL symbols[] = { { "IDENTIFIED", SYM(IDENTIFIED_SYM)}, { "IF", SYM(IF)}, { "IGNORE", SYM(IGNORE_SYM)}, + { "IGNORE_SERVER_IDS", SYM(IGNORE_SERVER_IDS_SYM)}, { "IMPORT", SYM(IMPORT)}, { "IN", SYM(IN_SYM)}, { "INDEX", SYM(INDEX_SYM)}, @@ -320,6 +324,7 @@ static SYMBOL symbols[] = { { "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)}, { "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)}, { "MASTER_USER", SYM(MASTER_USER_SYM)}, + { "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)}, { "MATCH", SYM(MATCH)}, { "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)}, { "MAX_QUERIES_PER_HOUR", SYM(MAX_QUERIES_PER_HOUR)}, @@ -426,6 +431,7 @@ static SYMBOL symbols[] = { { "REDUNDANT", SYM(REDUNDANT_SYM)}, { "REFERENCES", SYM(REFERENCES)}, { "REGEXP", SYM(REGEXP)}, + { "RELAYLOG", SYM(RELAYLOG_SYM)}, { "RELAY_LOG_FILE", SYM(RELAY_LOG_FILE_SYM)}, { "RELAY_LOG_POS", SYM(RELAY_LOG_POS_SYM)}, { "RELAY_THREAD", SYM(RELAY_THREAD)}, @@ -634,3 +640,5 @@ static SYMBOL sql_functions[] = { { "VAR_POP", SYM(VARIANCE_SYM)}, { "VAR_SAMP", SYM(VAR_SAMP_SYM)}, }; + +#endif /* LEX_INCLUDED */ diff --git a/sql/log.cc b/sql/log.cc index 4c30ecb94c1..491030aca34 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -38,6 +38,7 @@ #endif #include <mysql/plugin.h> +#include "rpl_handler.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -49,8 +50,7 @@ LOGGER logger; -MYSQL_BIN_LOG mysql_bin_log; -ulong sync_binlog_counter= 0; +MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period); static bool test_if_number(const char *str, long *res, bool allow_wildcards); @@ -961,6 +961,7 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, uint user_host_len= 0; ulonglong query_utime, lock_utime; + DBUG_ASSERT(thd->enable_slow_log); /* Print the message to the buffer if we have slow log enabled */ @@ -2410,10 +2411,11 @@ const char *MYSQL_LOG::generate_name(const char *log_name, -MYSQL_BIN_LOG::MYSQL_BIN_LOG() +MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), m_table_map_version(0), - is_relay_log(0), + sync_period_ptr(sync_period), + is_relay_log(0), signal_cnt(0), description_event_for_exec(0), description_event_for_queue(0) { /* @@ -3643,6 +3645,8 @@ bool MYSQL_BIN_LOG::append(Log_event* ev) } bytes_written+= ev->data_written; DBUG_PRINT("info",("max_size: %lu",max_size)); + if (flush_and_sync(0)) + goto err; if ((uint) my_b_append_tell(&log_file) > max_size) new_file_without_locking(); @@ -3673,6 +3677,8 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...) bytes_written += len; } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint))); DBUG_PRINT("info",("max_size: %lu",max_size)); + if (flush_and_sync(0)) + goto err; if ((uint) my_b_append_tell(&log_file) > max_size) new_file_without_locking(); @@ -3682,17 +3688,21 @@ err: DBUG_RETURN(error); } - -bool MYSQL_BIN_LOG::flush_and_sync() +bool MYSQL_BIN_LOG::flush_and_sync(bool *synced) { int err=0, fd=log_file.file; + if (synced) + *synced= 0; safe_mutex_assert_owner(&LOCK_log); if (flush_io_cache(&log_file)) return 1; - if (++sync_binlog_counter >= sync_binlog_period && sync_binlog_period) + uint sync_period= get_sync_period(); + if (sync_period && ++sync_counter >= sync_period) { - sync_binlog_counter= 0; + sync_counter= 0; err=my_sync(fd, MYF(MY_WME)); + if (synced) + *synced= 1; } return err; } @@ -3983,7 +3993,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (file == &log_file) { - error= flush_and_sync(); + error= flush_and_sync(0); if (!error) { signal_update(); @@ -4169,8 +4179,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) if (file == &log_file) // we are writing to the real log (disk) { - if (flush_and_sync()) + bool synced= 0; + if (flush_and_sync(&synced)) goto err; + + if (RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, synced))) { + sql_print_error("Failed to run 'after_flush' hooks"); + goto err; + } + signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } @@ -4425,7 +4443,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) DBUG_ASSERT(carry == 0); if (sync_log) - flush_and_sync(); + return flush_and_sync(0); return 0; // All OK } @@ -4472,7 +4490,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) ev.write(&log_file); if (lock) { - if (!error && !(error= flush_and_sync())) + if (!error && !(error= flush_and_sync(0))) { signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); @@ -4560,7 +4578,8 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, if (incident && write_incident(thd, FALSE)) goto err; - if (flush_and_sync()) + bool synced= 0; + if (flush_and_sync(&synced)) goto err; DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); if (cache->error) // Error on read @@ -4569,6 +4588,15 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, write_error=1; // Don't give more errors goto err; } + + if (RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, log_file.pos_in_file, synced))) + { + sql_print_error("Failed to run 'after_flush' hooks"); + write_error=1; + goto err; + } + signal_update(); } @@ -4605,12 +4633,9 @@ err: /** - Wait until we get a signal that the binary log has been updated. + Wait until we get a signal that the relay log has been updated. @param thd Thread variable - @param is_slave If 0, the caller is the Binlog_dump thread from master; - if 1, the caller is the SQL thread from the slave. This - influences only thd->proc_info. @note One must have a lock on LOCK_log before calling this function. @@ -4618,22 +4643,53 @@ err: THD::enter_cond() (see NOTES in sql_class.h). */ -void MYSQL_BIN_LOG::wait_for_update(THD* thd, bool is_slave) +void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) { const char *old_msg; - DBUG_ENTER("wait_for_update"); + DBUG_ENTER("wait_for_update_relay_log"); old_msg= thd->enter_cond(&update_cond, &LOCK_log, - is_slave ? - "Has read all relay log; waiting for the slave I/O " - "thread to update it" : - "Has sent all binlog to slave; waiting for binlog " - "to be updated"); + "Slave has read all relay log; " + "waiting for the slave I/O " + "thread to update it" ); pthread_cond_wait(&update_cond, &LOCK_log); thd->exit_cond(old_msg); DBUG_VOID_RETURN; } +/** + Wait until we get a signal that the binary log has been updated. + Applies to master only. + + NOTES + @param[in] thd a THD struct + @param[in] timeout a pointer to a timespec; + NULL means to wait w/o timeout. + @retval 0 if got signalled on update + @retval non-0 if wait timeout elapsed + @note + LOCK_log must be taken before calling this function. + LOCK_log is being released while the thread is waiting. + LOCK_log is released by the caller. +*/ + +int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, + const struct timespec *timeout) +{ + int ret= 0; + const char* old_msg = thd->proc_info; + DBUG_ENTER("wait_for_update_bin_log"); + old_msg= thd->enter_cond(&update_cond, &LOCK_log, + "Master has sent all binlog to slave; " + "waiting for binlog to be updated"); + if (!timeout) + pthread_cond_wait(&update_cond, &LOCK_log); + else + ret= pthread_cond_timedwait(&update_cond, &LOCK_log, + const_cast<struct timespec *>(timeout)); + DBUG_RETURN(ret); +} + /** Close the log file. @@ -4846,6 +4902,7 @@ bool flush_error_log() void MYSQL_BIN_LOG::signal_update() { DBUG_ENTER("MYSQL_BIN_LOG::signal_update"); + signal_cnt++; pthread_cond_broadcast(&update_cond); DBUG_VOID_RETURN; } diff --git a/sql/log.h b/sql/log.h index d306d6f7182..a31be6dcce6 100644 --- a/sql/log.h +++ b/sql/log.h @@ -269,6 +269,18 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ulonglong m_table_map_version; + /* pointer to the sync period variable, for binlog this will be + sync_binlog_period, for relay log this will be + sync_relay_log_period + */ + uint *sync_period_ptr; + uint sync_counter; + + inline uint get_sync_period() + { + return *sync_period_ptr; + } + int write_to_file(IO_CACHE *cache); /* This is used to start writing to a new log file. The difference from @@ -284,7 +296,7 @@ public: /* This is relay log */ bool is_relay_log; - + ulong signal_cnt; // update of the counter is checked by heartbeat /* These describe the log's format. This is used only for relay logs. _for_exec is used by the SQL thread, _for_queue by the I/O thread. It's @@ -296,7 +308,7 @@ public: Format_description_log_event *description_event_for_exec, *description_event_for_queue; - MYSQL_BIN_LOG(); + MYSQL_BIN_LOG(uint *sync_period); /* note that there's no destructor ~MYSQL_BIN_LOG() ! The reason is that we don't want it to be automatically called @@ -339,7 +351,8 @@ public: } void set_max_size(ulong max_size_arg); void signal_update(); - void wait_for_update(THD* thd, bool master_or_slave); + void wait_for_update_relay_log(THD* thd); + int wait_for_update_bin_log(THD* thd, const struct timespec * timeout); void set_need_start_event() { need_start_event = 1; } void init(bool no_auto_events_arg, ulong max_size); void init_pthread_objects(); @@ -378,7 +391,20 @@ public: bool is_active(const char* log_file_name); int update_log_index(LOG_INFO* linfo, bool need_update_threads); void rotate_and_purge(uint flags); - bool flush_and_sync(); + /** + Flush binlog cache and synchronize to disk. + + This function flushes events in binlog cache to binary log file, + it will do synchronizing according to the setting of system + variable 'sync_binlog'. If file is synchronized, @c synced will + be set to 1, otherwise 0. + + @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0 + + @retval 0 Success + @retval other Failure + */ + bool flush_and_sync(bool *synced); int purge_logs(const char *to_log, bool included, bool need_mutex, bool need_update_threads, ulonglong *decrease_log_space); diff --git a/sql/log_event.cc b/sql/log_event.cc index 2445560e65e..e574be774e3 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2389,13 +2389,29 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, charset_database_number= thd_arg->variables.collation_database->number; /* - If we don't use flags2 for anything else than options contained in - thd_arg->options, it would be more efficient to flags2=thd_arg->options - (OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time). - But it's likely that we don't want to use 32 bits for 3 bits; in the future - we will probably want to reclaim the 29 bits. So we need the &. + We only replicate over the bits of flags2 that we need: the rest + are masked out by "& OPTIONS_WRITTEN_TO_BINLOG". + + We also force AUTOCOMMIT=1. Rationale (cf. BUG#29288): After + fixing BUG#26395, we always write BEGIN and COMMIT around all + transactions (even single statements in autocommit mode). This is + so that replication from non-transactional to transactional table + and error recovery from XA to non-XA table should work as + expected. The BEGIN/COMMIT are added in log.cc. However, there is + one exception: MyISAM bypasses log.cc and writes directly to the + binlog. So if autocommit is off, master has MyISAM, and slave has + a transactional engine, then the slave will just see one long + never-ending transaction. The only way to bypass explicit + BEGIN/COMMIT in the binlog is by using a non-transactional table. + So setting AUTOCOMMIT=1 will make this work as expected. + + Note: explicitly replicate AUTOCOMMIT=1 from master. We do not + assume AUTOCOMMIT=1 on slave; the slave still reads the state of + the autocommit flag as written by the master to the binlog. This + behavior may change after WL#4162 has been implemented. */ - flags2= (uint32) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG); + flags2= (uint32) (thd_arg->options & + (OPTIONS_WRITTEN_TO_BIN_LOG & ~OPTION_NOT_AUTOCOMMIT)); DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256); DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256); DBUG_ASSERT(thd_arg->variables.collation_server->number < 256*256); @@ -3256,6 +3272,21 @@ Default database: '%s'. Query: '%s'", */ } /* End of if (db_ok(... */ + {/** + The following failure injecion works in cooperation with tests + setting @@global.debug= 'd,stop_slave_middle_group'. + The sql thread receives the killed status and will proceed + to shutdown trying to finish incomplete events group. + */ + DBUG_EXECUTE_IF("stop_slave_middle_group", + if (strcmp("COMMIT", query) != 0 && + strcmp("BEGIN", query) != 0) + { + if (thd->transaction.all.modified_non_trans_table) + const_cast<Relay_log_info*>(rli)->abort_slave= 1; + };); + } + end: /* Probably we have set thd->query, thd->db, thd->catalog to point to places @@ -3627,6 +3658,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[UPDATE_ROWS_EVENT-1]= post_header_len[DELETE_ROWS_EVENT-1]= 6;); post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN; + post_header_len[HEARTBEAT_LOG_EVENT-1]= 0; // Sanity-check that all post header lengths are initialized. IF_DBUG({ @@ -7454,8 +7486,16 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) thd->transaction.stmt.modified_non_trans_table= TRUE; } // row processing loop - DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", - const_cast<Relay_log_info*>(rli)->abort_slave= 1;); + {/** + The following failure injecion works in cooperation with tests + setting @@global.debug= 'd,stop_slave_middle_group'. + The sql thread receives the killed status and will proceed + to shutdown trying to finish incomplete events group. + */ + DBUG_EXECUTE_IF("stop_slave_middle_group", + if (thd->transaction.all.modified_non_trans_table) + const_cast<Relay_log_info*>(rli)->abort_slave= 1;); + } if ((error= do_after_row_operations(rli, error)) && ignored_error_code(convert_handler_error(error, thd, table))) @@ -7499,32 +7539,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); thd->is_slave_error= 1; } - /* - This code would ideally be placed in do_update_pos() instead, but - since we have no access to table there, we do the setting of - last_event_start_time here instead. - */ - else if (table && (table->s->primary_key == MAX_KEY) && - !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS) - { - /* - ------------ Temporary fix until WL#2975 is implemented --------- - - This event is not the last one (no STMT_END_F). If we stop now - (in case of terminate_slave_thread()), how will we restart? We - have to restart from Table_map_log_event, but as this table is - not transactional, the rows already inserted will still be - present, and idempotency is not guaranteed (no PK) so we risk - that repeating leads to double insert. So we desperately try to - continue, hope we'll eventually leave this buggy situation (by - executing the final Rows_log_event). If we are in a hopeless - wait (reached end of last relay log and nothing gets appended - there), we timeout after one minute, and notify DBA about the - problem. When WL#2975 is implemented, just remove the member - Relay_log_info::last_event_start_time and all its occurrences. - */ - const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); - } DBUG_RETURN(error); } @@ -8441,13 +8455,17 @@ Rows_log_event::write_row(const Relay_log_info *const rli, auto_afree_ptr<char> key(NULL); /* fill table->record[0] with default values */ - + bool abort_on_warnings= (rli->sql_thd->variables.sql_mode & + (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES)); if ((error= prepare_record(table, m_width, - TRUE /* check if columns have def. values */))) + table->file->ht->db_type != DB_TYPE_NDBCLUSTER, + abort_on_warnings, m_curr_row == m_rows_buf))) DBUG_RETURN(error); /* unpack row into table->record[0] */ - error= unpack_current_row(rli); // TODO: how to handle errors? + if ((error= unpack_current_row(rli, abort_on_warnings))) + DBUG_RETURN(error); + if (m_curr_row == m_rows_buf) { /* this is the first row to be inserted, we estimate the rows with @@ -9244,8 +9262,12 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) store_record(m_table,record[1]); + bool abort_on_warnings= (rli->sql_thd->variables.sql_mode & + (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES)); m_curr_row= m_curr_row_end; - error= unpack_current_row(rli); // this also updates m_curr_row_end + /* this also updates m_curr_row_end */ + if ((error= unpack_current_row(rli, abort_on_warnings))) + return error; /* Now we have the right row to update. The old row (the one we're @@ -9424,3 +9446,16 @@ st_print_event_info::st_print_event_info() open_cached_file(&body_cache, NULL, NULL, 0, flags); } #endif + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) +{ + uint8 header_size= description_event->common_header_len; + ident_len = event_len - header_size; + set_if_smaller(ident_len,FN_REFLEN-1); + log_ident= buf + header_size; +} +#endif diff --git a/sql/log_event.h b/sql/log_event.h index 8202dddcc76..de171145acd 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -250,6 +250,7 @@ struct sql_ex_info #define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1) #define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN) #define INCIDENT_HEADER_LEN 2 +#define HEARTBEAT_HEADER_LEN 0 /* Max number of possible extra bytes in a replication event compared to a packet (i.e. a query) sent from client to master; @@ -575,6 +576,12 @@ enum Log_event_type INCIDENT_EVENT= 26, /* + Heartbeat event to be send by master at its idle time + to ensure master's online status to slave + */ + HEARTBEAT_LOG_EVENT= 27, + + /* Add new events here - right above this comment! Existing events (except ENUM_END_EVENT) should never change their numbers */ @@ -689,6 +696,20 @@ typedef struct st_print_event_info } PRINT_EVENT_INFO; #endif +/** + the struct aggregates two paramenters that identify an event + uniquely in scope of communication of a particular master and slave couple. + I.e there can not be 2 events from the same staying connected master which + have the same coordinates. + @note + Such identifier is not yet unique generally as the event originating master + is resetable. Also the crashed master can be replaced with some other. +*/ +struct event_coordinates +{ + char * file_name; // binlog file name (directories stripped) + my_off_t pos; // event's position in the binlog file +}; /** @class Log_event @@ -3541,12 +3562,16 @@ protected: int write_row(const Relay_log_info *const, const bool); // Unpack the current row into m_table->record[0] - int unpack_current_row(const Relay_log_info *const rli) + int unpack_current_row(const Relay_log_info *const rli, + const bool abort_on_warning= TRUE) { DBUG_ASSERT(m_table); + + bool first_row= (m_curr_row == m_rows_buf); ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, &m_cols, - &m_curr_row_end, &m_master_reclength); + &m_curr_row_end, &m_master_reclength, + abort_on_warning, first_row); if (m_curr_row_end > m_rows_end) my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0)); ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT); @@ -3916,6 +3941,42 @@ static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); } +#ifndef MYSQL_CLIENT +/***************************************************************************** + + Heartbeat Log Event class + + Replication event to ensure to slave that master is alive. + The event is originated by master's dump thread and sent straight to + slave without being logged. Slave itself does not store it in relay log + but rather uses a data for immediate checks and throws away the event. + + Two members of the class log_ident and Log_event::log_pos comprise + @see the event_coordinates instance. The coordinates that a heartbeat + instance carries correspond to the last event master has sent from + its binlog. + + ****************************************************************************/ +class Heartbeat_log_event: public Log_event +{ +public: + Heartbeat_log_event(const char* buf, uint event_len, + const Format_description_log_event* description_event); + Log_event_type get_type_code() { return HEARTBEAT_LOG_EVENT; } + bool is_valid() const + { + return (log_ident != NULL && + log_pos >= BIN_LOG_HEADER_SIZE); + } + const char * get_log_ident() { return log_ident; } + uint get_ident_len() { return ident_len; } + +private: + const char* log_ident; + uint ident_len; +}; +#endif + /** @} (end of group Replication) */ diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 339e1c443bf..e7dda24f60d 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -223,7 +223,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info row_start= row_end; } - DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", + DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(table, error); if (!ev->cache_stmt) @@ -266,34 +266,6 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info DBUG_RETURN(error); } - /* - This code would ideally be placed in do_update_pos() instead, but - since we have no access to table there, we do the setting of - last_event_start_time here instead. - */ - if (table && (table->s->primary_key == MAX_KEY) && - !ev->cache_stmt && - ev->get_flags(Old_rows_log_event::STMT_END_F) == Old_rows_log_event::RLE_NO_FLAGS) - { - /* - ------------ Temporary fix until WL#2975 is implemented --------- - - This event is not the last one (no STMT_END_F). If we stop now - (in case of terminate_slave_thread()), how will we restart? We - have to restart from Table_map_log_event, but as this table is - not transactional, the rows already inserted will still be - present, and idempotency is not guaranteed (no PK) so we risk - that repeating leads to double insert. So we desperately try to - continue, hope we'll eventually leave this buggy situation (by - executing the final Old_rows_log_event). If we are in a hopeless - wait (reached end of last relay log and nothing gets appended - there), we timeout after one minute, and notify DBA about the - problem. When WL#2975 is implemented, just remove the member - st_relay_log_info::last_event_start_time and all its occurences. - */ - const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); - } - DBUG_RETURN(0); } #endif @@ -1741,7 +1713,7 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) } // row processing loop - DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", + DBUG_EXECUTE_IF("stop_slave_middle_group", const_cast<Relay_log_info*>(rli)->abort_slave= 1;); error= do_after_row_operations(rli, error); if (!cache_stmt) @@ -1784,33 +1756,6 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) DBUG_RETURN(error); } - /* - This code would ideally be placed in do_update_pos() instead, but - since we have no access to table there, we do the setting of - last_event_start_time here instead. - */ - if (table && (table->s->primary_key == MAX_KEY) && - !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS) - { - /* - ------------ Temporary fix until WL#2975 is implemented --------- - - This event is not the last one (no STMT_END_F). If we stop now - (in case of terminate_slave_thread()), how will we restart? We - have to restart from Table_map_log_event, but as this table is - not transactional, the rows already inserted will still be - present, and idempotency is not guaranteed (no PK) so we risk - that repeating leads to double insert. So we desperately try to - continue, hope we'll eventually leave this buggy situation (by - executing the final Old_rows_log_event). If we are in a hopeless - wait (reached end of last relay log and nothing gets appended - there), we timeout after one minute, and notify DBA about the - problem. When WL#2975 is implemented, just remove the member - Relay_log_info::last_event_start_time and all its occurrences. - */ - const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); - } - DBUG_RETURN(0); } diff --git a/sql/message.h b/sql/message.h index 0e7c282d5a1..97d039352b4 100644 --- a/sql/message.h +++ b/sql/message.h @@ -1,3 +1,6 @@ +#ifndef MESSAGE_INCLUDED
+#define MESSAGE_INCLUDED
+
/*
To change or add messages mysqld writes to the Windows error log, run
mc.exe message.mc
@@ -6,6 +9,8 @@ mc.exe can be installed with Windows SDK, some Visual Studio distributions
do not include it.
*/
+
+
//
// Values are 32 bit values layed out as follows:
//
@@ -53,3 +58,5 @@ //
#define MSG_DEFAULT 0xC0000064L
+#endif /* MESSAGE_INCLUDED */
+
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 67242710fb2..507f4b013cc 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -938,100 +938,6 @@ struct Query_cache_query_flags #define query_cache_is_cacheable_query(L) 0 #endif /*HAVE_QUERY_CACHE*/ -/* - Error injector Macros to enable easy testing of recovery after failures - in various error cases. -*/ -#ifndef ERROR_INJECT_SUPPORT - -#define ERROR_INJECT(x) 0 -#define ERROR_INJECT_ACTION(x,action) 0 -#define ERROR_INJECT_CRASH(x) 0 -#define ERROR_INJECT_VALUE(x) 0 -#define ERROR_INJECT_VALUE_ACTION(x,action) 0 -#define ERROR_INJECT_VALUE_CRASH(x) 0 -#define SET_ERROR_INJECT_VALUE(x) - -#else - -inline bool check_and_unset_keyword(const char *dbug_str) -{ - const char *extra_str= "-d,"; - char total_str[200]; - if (_db_strict_keyword_ (dbug_str)) - { - strxmov(total_str, extra_str, dbug_str, NullS); - DBUG_SET(total_str); - return 1; - } - return 0; -} - - -inline bool -check_and_unset_inject_value(int value) -{ - THD *thd= current_thd; - if (thd->error_inject_value == (uint)value) - { - thd->error_inject_value= 0; - return 1; - } - return 0; -} - -/* - ERROR INJECT MODULE: - -------------------- - These macros are used to insert macros from the application code. - The event that activates those error injections can be activated - from SQL by using: - SET SESSION dbug=+d,code; - - After the error has been injected, the macros will automatically - remove the debug code, thus similar to using: - SET SESSION dbug=-d,code - from SQL. - - ERROR_INJECT_CRASH will inject a crash of the MySQL Server if code - is set when macro is called. ERROR_INJECT_CRASH can be used in - if-statements, it will always return FALSE unless of course it - crashes in which case it doesn't return at all. - - ERROR_INJECT_ACTION will inject the action specified in the action - parameter of the macro, before performing the action the code will - be removed such that no more events occur. ERROR_INJECT_ACTION - can also be used in if-statements and always returns FALSE. - ERROR_INJECT can be used in a normal if-statement, where the action - part is performed in the if-block. The macro returns TRUE if the - error was activated and otherwise returns FALSE. If activated the - code is removed. - - Sometimes it is necessary to perform error inject actions as a serie - of events. In this case one can use one variable on the THD object. - Thus one sets this value by using e.g. SET_ERROR_INJECT_VALUE(100). - Then one can later test for it by using ERROR_INJECT_CRASH_VALUE, - ERROR_INJECT_ACTION_VALUE and ERROR_INJECT_VALUE. This have the same - behaviour as the above described macros except that they use the - error inject value instead of a code used by DBUG macros. -*/ -#define SET_ERROR_INJECT_VALUE(x) \ - current_thd->error_inject_value= (x) -#define ERROR_INJECT_CRASH(code) \ - DBUG_EVALUATE_IF(code, (abort(), 0), 0) -#define ERROR_INJECT_ACTION(code, action) \ - (check_and_unset_keyword(code) ? ((action), 0) : 0) -#define ERROR_INJECT(code) \ - check_and_unset_keyword(code) -#define ERROR_INJECT_VALUE(value) \ - check_and_unset_inject_value(value) -#define ERROR_INJECT_VALUE_ACTION(value,action) \ - (check_and_unset_inject_value(value) ? (action) : 0) -#define ERROR_INJECT_VALUE_CRASH(value) \ - ERROR_INJECT_VALUE_ACTION(value, (abort(), 0)) - -#endif - void write_bin_log(THD *thd, bool clear_error, char const *query, ulong query_length); @@ -1961,10 +1867,13 @@ extern ulong MYSQL_PLUGIN_IMPORT specialflag; #endif /* MYSQL_SERVER || INNODB_COMPATIBILITY_HOOKS */ #ifdef MYSQL_SERVER extern ulong current_pid; -extern ulong expire_logs_days, sync_binlog_period, sync_binlog_counter; +extern ulong expire_logs_days; +extern uint sync_binlog_period, sync_relaylog_period, + sync_relayloginfo_period, sync_masterinfo_period; extern ulong opt_tc_log_size, tc_log_max_pages_used, tc_log_page_size; extern ulong tc_log_page_waits; extern my_bool relay_log_purge, opt_innodb_safe_binlog, opt_innodb; +extern my_bool relay_log_recovery; extern uint test_flags,select_errors,ha_open_options; extern uint protocol_version, mysqld_port, dropping_tables; extern uint delay_key_write_options; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 3e2f8eabd39..80bd6b7b48c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -31,6 +31,8 @@ #include "rpl_injector.h" +#include "rpl_handler.h" + #ifdef HAVE_SYS_PRCTL_H #include <sys/prctl.h> #endif @@ -477,6 +479,7 @@ extern const char *opt_ndb_distribution; extern enum ndb_distribution opt_ndb_distribution_id; #endif my_bool opt_readonly, use_temp_pool, relay_log_purge; +my_bool relay_log_recovery; my_bool opt_sync_frm, opt_allow_suspicious_udfs; my_bool opt_secure_auth= 0; char* opt_secure_file_priv= 0; @@ -552,7 +555,9 @@ ulong max_prepared_stmt_count; */ ulong prepared_stmt_count=0; ulong thread_id=1L,current_pid; -ulong slow_launch_threads = 0, sync_binlog_period; +ulong slow_launch_threads = 0; +uint sync_binlog_period= 0, sync_relaylog_period= 0, + sync_relayloginfo_period= 0, sync_masterinfo_period= 0; ulong expire_logs_days = 0; ulong rpl_recovery_rank=0; const char *log_output_str= "FILE"; @@ -1284,6 +1289,7 @@ void clean_up(bool print_message) ha_end(); if (tc_log) tc_log->close(); + delegates_destroy(); xid_cache_free(); delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache); multi_keycache_free(); @@ -2119,15 +2125,14 @@ static void init_signals(void) win_install_sigabrt_handler(); if(opt_console) SetConsoleCtrlHandler(console_event_handler,TRUE); - else - { + /* Avoid MessageBox()es*/ - _CrtSetReportMode(_CRT_WARN, _CRTDBG_MODE_FILE); - _CrtSetReportFile(_CRT_WARN, _CRTDBG_FILE_STDERR); - _CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_FILE); - _CrtSetReportFile(_CRT_ERROR, _CRTDBG_FILE_STDERR); - _CrtSetReportMode(_CRT_ASSERT, _CRTDBG_MODE_FILE); - _CrtSetReportFile(_CRT_ASSERT, _CRTDBG_FILE_STDERR); + _CrtSetReportMode(_CRT_WARN, _CRTDBG_MODE_FILE); + _CrtSetReportFile(_CRT_WARN, _CRTDBG_FILE_STDERR); + _CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_FILE); + _CrtSetReportFile(_CRT_ERROR, _CRTDBG_FILE_STDERR); + _CrtSetReportMode(_CRT_ASSERT, _CRTDBG_MODE_FILE); + _CrtSetReportFile(_CRT_ASSERT, _CRTDBG_FILE_STDERR); /* Do not use SEM_NOGPFAULTERRORBOX in the following SetErrorMode (), @@ -2136,8 +2141,8 @@ static void init_signals(void) exception filter is not guaranteed to work in all situation (like heap corruption or stack overflow) */ - SetErrorMode(SetErrorMode(0)|SEM_FAILCRITICALERRORS|SEM_NOOPENFILEERRORBOX); - } + SetErrorMode(SetErrorMode(0) | SEM_FAILCRITICALERRORS + | SEM_NOOPENFILEERRORBOX); SetUnhandledExceptionFilter(my_unhandler_exception_filter); } @@ -3136,6 +3141,7 @@ SHOW_VAR com_status_vars[]= { {"show_processlist", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PROCESSLIST]), SHOW_LONG_STATUS}, {"show_profile", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PROFILE]), SHOW_LONG_STATUS}, {"show_profiles", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PROFILES]), SHOW_LONG_STATUS}, + {"show_relaylog_events", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_RELAYLOG_EVENTS]), SHOW_LONG_STATUS}, {"show_slave_hosts", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_SLAVE_HOSTS]), SHOW_LONG_STATUS}, {"show_slave_status", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_SLAVE_STAT]), SHOW_LONG_STATUS}, {"show_status", (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_STATUS]), SHOW_LONG_STATUS}, @@ -3761,6 +3767,13 @@ static int init_server_components() unireg_abort(1); } + /* initialize delegates for extension observers */ + if (delegates_init()) + { + sql_print_error("Initialize extension delegates failed"); + unireg_abort(1); + } + /* need to configure logging before initializing storage engines */ if (opt_update_log) { @@ -3819,17 +3832,17 @@ with --log-bin instead."); } if (opt_log_slave_updates && !opt_bin_log) { - sql_print_error("You need to use --log-bin to make " + sql_print_warning("You need to use --log-bin to make " "--log-slave-updates work."); - unireg_abort(1); } if (!opt_bin_log) { if (opt_binlog_format_id != BINLOG_FORMAT_UNSPEC) { - sql_print_error("You need to use --log-bin to make " - "--binlog-format work."); - unireg_abort(1); + sql_print_warning("You need to use --log-bin to make " + "--binlog-format work."); + + global_system_variables.binlog_format= opt_binlog_format_id; } else { @@ -3851,11 +3864,17 @@ with --log-bin instead."); #ifdef HAVE_REPLICATION if (opt_log_slave_updates && replicate_same_server_id) { - sql_print_error("\ -using --replicate-same-server-id in conjunction with \ + if (opt_bin_log) + { + sql_print_error("using --replicate-same-server-id in conjunction with \ --log-slave-updates is impossible, it would lead to infinite loops in this \ server."); - unireg_abort(1); + unireg_abort(1); + } + else + sql_print_warning("using --replicate-same-server-id in conjunction with \ +--log-slave-updates would lead to infinite loops in this server. However this \ +will be ignored as the --log-bin option is not defined."); } #endif @@ -5599,6 +5618,7 @@ enum options_mysqld OPT_QUERY_CACHE_TYPE, OPT_QUERY_CACHE_WLOCK_INVALIDATE, OPT_RECORD_BUFFER, OPT_RECORD_RND_BUFFER, OPT_DIV_PRECINCREMENT, OPT_RELAY_LOG_SPACE_LIMIT, OPT_RELAY_LOG_PURGE, + OPT_RELAY_LOG_RECOVERY, OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME, OPT_SLAVE_TRANS_RETRIES, OPT_READONLY, OPT_DEBUGGING, OPT_SORT_BUFFER, OPT_TABLE_OPEN_CACHE, OPT_TABLE_DEF_CACHE, @@ -5662,7 +5682,10 @@ enum options_mysqld OPT_SLAVE_EXEC_MODE, OPT_GENERAL_LOG_FILE, OPT_SLOW_QUERY_LOG_FILE, - OPT_IGNORE_BUILTIN_INNODB + OPT_IGNORE_BUILTIN_INNODB, + OPT_SYNC_RELAY_LOG, + OPT_SYNC_RELAY_LOG_INFO, + OPT_SYNC_MASTER_INFO }; @@ -6882,6 +6905,13 @@ The minimum value for this variable is 4096.", (uchar**) &relay_log_purge, (uchar**) &relay_log_purge, 0, GET_BOOL, NO_ARG, 1, 0, 1, 0, 1, 0}, + {"relay_log_recovery", OPT_RELAY_LOG_RECOVERY, + "Enables automatic relay log recovery right after the database startup, " + "which means that the IO Thread starts re-fetching from the master " + "right after the last transaction processed.", + (uchar**) &relay_log_recovery, + (uchar**) &relay_log_recovery, 0, GET_BOOL, NO_ARG, + 0, 0, 1, 0, 1, 0}, {"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT, "Maximum space to use for all relay logs.", (uchar**) &relay_log_space_limit, @@ -6916,8 +6946,23 @@ The minimum value for this variable is 4096.", {"sync-binlog", OPT_SYNC_BINLOG, "Synchronously flush binary log to disk after every #th event. " "Use 0 (default) to disable synchronous flushing.", - (uchar**) &sync_binlog_period, (uchar**) &sync_binlog_period, 0, GET_ULONG, - REQUIRED_ARG, 0, 0, ULONG_MAX, 0, 1, 0}, + (uchar**) &sync_binlog_period, (uchar**) &sync_binlog_period, 0, GET_UINT, + REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0}, + {"sync-relay-log", OPT_SYNC_RELAY_LOG, + "Synchronously flush relay log to disk after every #th event. " + "Use 0 (default) to disable synchronous flushing.", + (uchar**) &sync_relaylog_period, (uchar**) &sync_relaylog_period, 0, GET_UINT, + REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0}, + {"sync-relay-log-info", OPT_SYNC_RELAY_LOG_INFO, + "Synchronously flush relay log info to disk after #th transaction. " + "Use 0 (default) to disable synchronous flushing.", + (uchar**) &sync_relayloginfo_period, (uchar**) &sync_relayloginfo_period, 0, GET_UINT, + REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0}, + {"sync-master-info", OPT_SYNC_MASTER_INFO, + "Synchronously flush master info to disk after every #th event. " + "Use 0 (default) to disable synchronous flushing.", + (uchar**) &sync_masterinfo_period, (uchar**) &sync_masterinfo_period, 0, GET_UINT, + REQUIRED_ARG, 0, 0, (longlong) UINT_MAX, 0, 1, 0}, {"sync-frm", OPT_SYNC_FRM, "Sync .frm to disk on create. Enabled by default.", (uchar**) &opt_sync_frm, (uchar**) &opt_sync_frm, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0}, @@ -7043,7 +7088,8 @@ static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff) var->type= SHOW_MY_BOOL; pthread_mutex_lock(&LOCK_active_mi); var->value= buff; - *((my_bool *)buff)= (my_bool) (active_mi && active_mi->slave_running && + *((my_bool *)buff)= (my_bool) (active_mi && + active_mi->slave_running == MYSQL_SLAVE_RUN_CONNECT && active_mi->rli.slave_running); pthread_mutex_unlock(&LOCK_active_mi); return 0; @@ -7069,6 +7115,40 @@ static int show_slave_retried_trans(THD *thd, SHOW_VAR *var, char *buff) pthread_mutex_unlock(&LOCK_active_mi); return 0; } + +static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) +{ + pthread_mutex_lock(&LOCK_active_mi); + if (active_mi) + { + var->type= SHOW_LONGLONG; + var->value= buff; + pthread_mutex_lock(&active_mi->rli.data_lock); + *((longlong *)buff)= active_mi->received_heartbeats; + pthread_mutex_unlock(&active_mi->rli.data_lock); + } + else + var->type= SHOW_UNDEF; + pthread_mutex_unlock(&LOCK_active_mi); + return 0; +} + +static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff) +{ + pthread_mutex_lock(&LOCK_active_mi); + if (active_mi) + { + var->type= SHOW_CHAR; + var->value= buff; + my_sprintf(buff, (buff, "%.3f",active_mi->heartbeat_period)); + } + else + var->type= SHOW_UNDEF; + pthread_mutex_unlock(&LOCK_active_mi); + return 0; +} + + #endif /* HAVE_REPLICATION */ static int show_open_tables(THD *thd, SHOW_VAR *var, char *buff) @@ -7433,6 +7513,8 @@ SHOW_VAR status_vars[]= { {"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG}, #ifdef HAVE_REPLICATION {"Slave_retried_transactions",(char*) &show_slave_retried_trans, SHOW_FUNC}, + {"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_FUNC}, + {"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats, SHOW_FUNC}, {"Slave_running", (char*) &show_slave_running, SHOW_FUNC}, #endif {"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG}, diff --git a/sql/mysqld_suffix.h b/sql/mysqld_suffix.h index 654d7cf88c1..c7ab212f2a2 100644 --- a/sql/mysqld_suffix.h +++ b/sql/mysqld_suffix.h @@ -1,3 +1,6 @@ +#ifndef MYSQLD_SUFFIX_INCLUDED +#define MYSQLD_SUFFIX_INCLUDED + /* Copyright (C) 2000-2004 MySQL AB This program is free software; you can redistribute it and/or modify @@ -27,3 +30,4 @@ #else #define MYSQL_SERVER_SUFFIX_STR MYSQL_SERVER_SUFFIX_DEF #endif +#endif /* MYSQLD_SUFFIX_INCLUDED */ diff --git a/sql/nt_servc.h b/sql/nt_servc.h index 2f0d07df543..5bee42dedf0 100644 --- a/sql/nt_servc.h +++ b/sql/nt_servc.h @@ -1,3 +1,6 @@ +#ifndef NT_SERVC_INCLUDED +#define NT_SERVC_INCLUDED + /** @file @@ -98,3 +101,5 @@ class NTService }; /* ------------------------- the end -------------------------------------- */ + +#endif /* NT_SERVC_INCLUDED */ diff --git a/sql/partition_element.h b/sql/partition_element.h index 905bc38165b..d67259b9605 100644 --- a/sql/partition_element.h +++ b/sql/partition_element.h @@ -1,3 +1,6 @@ +#ifndef PARTITION_ELEMENT_INCLUDED +#define PARTITION_ELEMENT_INCLUDED + /* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -97,3 +100,5 @@ public: } ~partition_element() {} }; + +#endif /* PARTITION_ELEMENT_INCLUDED */ diff --git a/sql/partition_info.h b/sql/partition_info.h index 9f438e8260b..b5a6c4a0961 100644 --- a/sql/partition_info.h +++ b/sql/partition_info.h @@ -1,3 +1,6 @@ +#ifndef PARTITION_INFO_INCLUDED +#define PARTITION_INFO_INCLUDED + /* Copyright 2006-2008 MySQL AB, 2008 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify @@ -314,3 +317,5 @@ void init_all_partitions_iterator(partition_info *part_info, part_iter->ret_null_part= part_iter->ret_null_part_orig= FALSE; part_iter->get_next= get_next_partition_id_range; } + +#endif /* PARTITION_INFO_INCLUDED */ diff --git a/sql/procedure.h b/sql/procedure.h index ceb586766b1..c6f50493876 100644 --- a/sql/procedure.h +++ b/sql/procedure.h @@ -1,3 +1,6 @@ +#ifndef PROCEDURE_INCLUDED +#define PROCEDURE_INCLUDED + /* Copyright (C) 2000-2005 MySQL AB This program is free software; you can redistribute it and/or modify @@ -149,3 +152,5 @@ public: Procedure *setup_procedure(THD *thd,ORDER *proc_param,select_result *result, List<Item> &field_list,int *error); + +#endif /* PROCEDURE_INCLUDED */ diff --git a/sql/protocol.h b/sql/protocol.h index 251ba6fbc33..a39636a1595 100644 --- a/sql/protocol.h +++ b/sql/protocol.h @@ -1,3 +1,6 @@ +#ifndef PROTOCOL_INCLUDED +#define PROTOCOL_INCLUDED + /* Copyright (C) 2002-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -180,3 +183,4 @@ uchar *net_store_data(uchar *to,const uchar *from, size_t length); uchar *net_store_data(uchar *to,int32 from); uchar *net_store_data(uchar *to,longlong from); +#endif /* PROTOCOL_INCLUDED */ diff --git a/sql/repl_failsafe.h b/sql/repl_failsafe.h index 6ff78067aca..bce2c727050 100644 --- a/sql/repl_failsafe.h +++ b/sql/repl_failsafe.h @@ -1,3 +1,6 @@ +#ifndef REPL_FAILSAFE_INCLUDED +#define REPL_FAILSAFE_INCLUDED + /* Copyright (C) 2001-2005 MySQL AB & Sasha This program is free software; you can redistribute it and/or modify @@ -49,3 +52,4 @@ int register_slave(THD* thd, uchar* packet, uint packet_length); void unregister_slave(THD* thd, bool only_mine, bool need_mutex); #endif /* HAVE_REPLICATION */ +#endif /* REPL_FAILSAFE_INCLUDED */ diff --git a/sql/replication.h b/sql/replication.h new file mode 100644 index 00000000000..6b7be58a5b1 --- /dev/null +++ b/sql/replication.h @@ -0,0 +1,490 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef REPLICATION_H +#define REPLICATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + Transaction observer flags. +*/ +enum Trans_flags { + /** Transaction is a real transaction */ + TRANS_IS_REAL_TRANS = 1 +}; + +/** + Transaction observer parameter +*/ +typedef struct Trans_param { + uint32 server_id; + uint32 flags; + + /* + The latest binary log file name and position written by current + transaction, if binary log is disabled or no log event has been + written into binary log file by current transaction (events + written into transaction log cache are not counted), these two + member will be zero. + */ + const char *log_file; + my_off_t log_pos; +} Trans_param; + +/** + Observes and extends transaction execution +*/ +typedef struct Trans_observer { + uint32 len; + + /** + This callback is called after transaction commit + + This callback is called right after commit to storage engines for + transactional tables. + + For non-transactional tables, this is called at the end of the + statement, before sending statement status, if the statement + succeeded. + + @note The return value is currently ignored by the server. + + @param param The parameter for transaction observers + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_commit)(Trans_param *param); + + /** + This callback is called after transaction rollback + + This callback is called right after rollback to storage engines + for transactional tables. + + For non-transactional tables, this is called at the end of the + statement, before sending statement status, if the statement + failed. + + @note The return value is currently ignored by the server. + + @param param The parameter for transaction observers + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_rollback)(Trans_param *param); +} Trans_observer; + +/** + Binlog storage flags +*/ +enum Binlog_storage_flags { + /** Binary log was sync:ed */ + BINLOG_STORAGE_IS_SYNCED = 1 +}; + +/** + Binlog storage observer parameters + */ +typedef struct Binlog_storage_param { + uint32 server_id; +} Binlog_storage_param; + +/** + Observe binlog logging storage +*/ +typedef struct Binlog_storage_observer { + uint32 len; + + /** + This callback is called after binlog has been flushed + + This callback is called after cached events have been flushed to + binary log file. Whether the binary log file is synchronized to + disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags. + + @param param Observer common parameter + @param log_file Binlog file name been updated + @param log_pos Binlog position after update + @param flags flags for binlog storage + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_flush)(Binlog_storage_param *param, + const char *log_file, my_off_t log_pos, + uint32 flags); +} Binlog_storage_observer; + +/** + Replication binlog transmitter (binlog dump) observer parameter. +*/ +typedef struct Binlog_transmit_param { + uint32 server_id; + uint32 flags; +} Binlog_transmit_param; + +/** + Observe and extends the binlog dumping thread. +*/ +typedef struct Binlog_transmit_observer { + uint32 len; + + /** + This callback is called when binlog dumping starts + + + @param param Observer common parameter + @param log_file Binlog file name to transmit from + @param log_pos Binlog position to transmit from + + @retval 0 Sucess + @retval 1 Failure + */ + int (*transmit_start)(Binlog_transmit_param *param, + const char *log_file, my_off_t log_pos); + + /** + This callback is called when binlog dumping stops + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*transmit_stop)(Binlog_transmit_param *param); + + /** + This callback is called to reserve bytes in packet header for event transmission + + This callback is called when resetting transmit packet header to + reserve bytes for this observer in packet header. + + The @a header buffer is allocated by the server code, and @a size + is the size of the header buffer. Each observer can only reserve + a maximum size of @a size in the header. + + @param param Observer common parameter + @param header Pointer of the header buffer + @param size Size of the header buffer + @param len Header length reserved by this observer + + @retval 0 Sucess + @retval 1 Failure + */ + int (*reserve_header)(Binlog_transmit_param *param, + unsigned char *header, + unsigned long size, + unsigned long *len); + + /** + This callback is called before sending an event packet to slave + + @param param Observer common parameter + @param packet Binlog event packet to send + @param len Length of the event packet + @param log_file Binlog file name of the event packet to send + @param log_pos Binlog position of the event packet to send + + @retval 0 Sucess + @retval 1 Failure + */ + int (*before_send_event)(Binlog_transmit_param *param, + unsigned char *packet, unsigned long len, + const char *log_file, my_off_t log_pos ); + + /** + This callback is called after sending an event packet to slave + + @param param Observer common parameter + @param event_buf Binlog event packet buffer sent + @param len length of the event packet buffer + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_send_event)(Binlog_transmit_param *param, + const char *event_buf, unsigned long len); + + /** + This callback is called after resetting master status + + This is called when executing the command RESET MASTER, and is + used to reset status variables added by observers. + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_reset_master)(Binlog_transmit_param *param); +} Binlog_transmit_observer; + +/** + Binlog relay IO flags +*/ +enum Binlog_relay_IO_flags { + /** Binary relay log was sync:ed */ + BINLOG_RELAY_IS_SYNCED = 1 +}; + + +/** + Replication binlog relay IO observer parameter +*/ +typedef struct Binlog_relay_IO_param { + uint32 server_id; + + /* Master host, user and port */ + char *host; + char *user; + unsigned int port; + + char *master_log_name; + my_off_t master_log_pos; + + MYSQL *mysql; /* the connection to master */ +} Binlog_relay_IO_param; + +/** + Observes and extends the service of slave IO thread. +*/ +typedef struct Binlog_relay_IO_observer { + uint32 len; + + /** + This callback is called when slave IO thread starts + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*thread_start)(Binlog_relay_IO_param *param); + + /** + This callback is called when slave IO thread stops + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*thread_stop)(Binlog_relay_IO_param *param); + + /** + This callback is called before slave requesting binlog transmission from master + + This is called before slave issuing BINLOG_DUMP command to master + to request binlog. + + @param param Observer common parameter + @param flags binlog dump flags + + @retval 0 Sucess + @retval 1 Failure + */ + int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags); + + /** + This callback is called after read an event packet from master + + @param param Observer common parameter + @param packet The event packet read from master + @param len Length of the event packet read from master + @param event_buf The event packet return after process + @param event_len The length of event packet return after process + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_read_event)(Binlog_relay_IO_param *param, + const char *packet, unsigned long len, + const char **event_buf, unsigned long *event_len); + + /** + This callback is called after written an event packet to relay log + + @param param Observer common parameter + @param event_buf Event packet written to relay log + @param event_len Length of the event packet written to relay log + @param flags flags for relay log + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_queue_event)(Binlog_relay_IO_param *param, + const char *event_buf, unsigned long event_len, + uint32 flags); + + /** + This callback is called after reset slave relay log IO status + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_reset_slave)(Binlog_relay_IO_param *param); +} Binlog_relay_IO_observer; + + +/** + Register a transaction observer + + @param observer The transaction observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_trans_observer(Trans_observer *observer, void *p); + +/** + Unregister a transaction observer + + @param observer The transaction observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_trans_observer(Trans_observer *observer, void *p); + +/** + Register a binlog storage observer + + @param observer The binlog storage observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p); + +/** + Unregister a binlog storage observer + + @param observer The binlog storage observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p); + +/** + Register a binlog transmit observer + + @param observer The binlog transmit observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p); + +/** + Unregister a binlog transmit observer + + @param observer The binlog transmit observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p); + +/** + Register a binlog relay IO (slave IO thread) observer + + @param observer The binlog relay IO observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p); + +/** + Unregister a binlog relay IO (slave IO thread) observer + + @param observer The binlog relay IO observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p); + +/** + Connect to master + + This function can only used in the slave I/O thread context, and + will use the same master information to do the connection. + + @code + MYSQL *mysql = mysql_init(NULL); + if (rpl_connect_master(mysql)) + { + // do stuff with the connection + } + mysql_close(mysql); // close the connection + @endcode + + @param mysql address of MYSQL structure to use, pass NULL will + create a new one + + @return address of MYSQL structure on success, NULL on failure +*/ +MYSQL *rpl_connect_master(MYSQL *mysql); + +/** + Set thread entering a condition + + This function should be called before putting a thread to wait for + a condition. @a mutex should be held before calling this + function. After being waken up, @f thd_exit_cond should be called. + + @param thd The thread entering the condition, NULL means current thread + @param cond The condition the thread is going to wait for + @param mutex The mutex associated with the condition, this must be + held before call this function + @param msg The new process message for the thread +*/ +const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond, + pthread_mutex_t *mutex, const char *msg); + +/** + Set thread leaving a condition + + This function should be called after a thread being waken up for a + condition. + + @param thd The thread entering the condition, NULL means current thread + @param old_msg The process message, ususally this should be the old process + message before calling @f thd_enter_cond +*/ +void thd_exit_cond(MYSQL_THD thd, const char *old_msg); + + +#ifdef __cplusplus +} +#endif +#endif /* REPLICATION_H */ diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc new file mode 100644 index 00000000000..da7aade5b99 --- /dev/null +++ b/sql/rpl_handler.cc @@ -0,0 +1,493 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "mysql_priv.h" + +#include "rpl_mi.h" +#include "sql_repl.h" +#include "log_event.h" +#include "rpl_filter.h" +#include <my_dir.h> +#include "rpl_handler.h" + +Trans_delegate *transaction_delegate; +Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +Binlog_transmit_delegate *binlog_transmit_delegate; +Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + structure to save transaction log filename and position +*/ +typedef struct Trans_binlog_info { + my_off_t log_pos; + char log_file[FN_REFLEN]; +} Trans_binlog_info; + +static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + +int get_user_var_int(const char *name, + long long int *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_int(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_real(const char *name, + double *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_real(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_str(const char *name, char *value, + size_t len, unsigned int precision, int *null_value) +{ + String str; + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + entry->val_str(&null_val, &str, precision); + strncpy(value, str.c_ptr(), len); + if (null_value) + *null_value= null_val; + return 0; +} + +int delegates_init() +{ + static unsigned long trans_mem[sizeof(Trans_delegate) / sizeof(unsigned long) + 1]; + static unsigned long storage_mem[sizeof(Binlog_storage_delegate) / sizeof(unsigned long) + 1]; +#ifdef HAVE_REPLICATION + static unsigned long transmit_mem[sizeof(Binlog_transmit_delegate) / sizeof(unsigned long) + 1]; + static unsigned long relay_io_mem[sizeof(Binlog_relay_IO_delegate)/ sizeof(unsigned long) + 1]; +#endif + + if (!(transaction_delegate= new (trans_mem) Trans_delegate) + || (!transaction_delegate->is_inited()) + || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate) + || (!binlog_storage_delegate->is_inited()) +#ifdef HAVE_REPLICATION + || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate) + || (!binlog_transmit_delegate->is_inited()) + || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate) + || (!binlog_relay_io_delegate->is_inited()) +#endif /* HAVE_REPLICATION */ + ) + return 1; + + if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL)) + return 1; + return 0; +} + +void delegates_destroy() +{ + if (transaction_delegate) + transaction_delegate->~Trans_delegate(); + if (binlog_storage_delegate) + binlog_storage_delegate->~Binlog_storage_delegate(); +#ifdef HAVE_REPLICATION + if (binlog_transmit_delegate) + binlog_transmit_delegate->~Binlog_transmit_delegate(); + if (binlog_relay_io_delegate) + binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); +#endif /* HAVE_REPLICATION */ +} + +/* + This macro is used by almost all the Delegate methods to iterate + over all the observers running given callback function of the + delegate . + + Add observer plugins to the thd->lex list, after each statement, all + plugins add to thd->lex will be automatically unlocked. + */ +#define FOREACH_OBSERVER(r, f, thd, args) \ + param.server_id= thd->server_id; \ + read_lock(); \ + Observer_info_iterator iter= observer_info_iter(); \ + Observer_info *info= iter++; \ + for (; info; info= iter++) \ + { \ + plugin_ref plugin= \ + my_plugin_lock(thd, &info->plugin); \ + if (!plugin) \ + { \ + r= 1; \ + break; \ + } \ + if (((Observer *)info->observer)->f \ + && ((Observer *)info->observer)->f args) \ + { \ + r= 1; \ + plugin_unlock(thd, plugin); \ + break; \ + } \ + plugin_unlock(thd, plugin); \ + } \ + unlock() + + +int Trans_delegate::after_commit(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Trans_delegate::after_rollback(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Binlog_storage_delegate::after_flush(THD *thd, + const char *log_file, + my_off_t log_pos, + bool synced) +{ + Binlog_storage_param param; + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + if (!log_info) + { + if(!(log_info= + (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0)))) + return 1; + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info); + } + + strcpy(log_info->log_file, log_file+dirname_length(log_file)); + log_info->log_pos = log_pos; + + int ret= 0; + FOREACH_OBSERVER(ret, after_flush, thd, + (¶m, log_info->log_file, log_info->log_pos, flags)); + return ret; +} + +#ifdef HAVE_REPLICATION +int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos)); + return ret; +} + +int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m)); + return ret; +} + +int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, + String *packet) +{ + /* NOTE2ME: Maximum extra header size for each observer, I hope 32 + bytes should be enough for each Observer to reserve their extra + header. If later found this is not enough, we can increase this + /HEZX + */ +#define RESERVE_HEADER_SIZE 32 + unsigned char header[RESERVE_HEADER_SIZE]; + ulong hlen; + Binlog_transmit_param param; + param.flags= flags; + param.server_id= thd->server_id; + + int ret= 0; + read_lock(); + Observer_info_iterator iter= observer_info_iter(); + Observer_info *info= iter++; + for (; info; info= iter++) + { + plugin_ref plugin= + my_plugin_lock(thd, &info->plugin); + if (!plugin) + { + ret= 1; + break; + } + hlen= 0; + if (((Observer *)info->observer)->reserve_header + && ((Observer *)info->observer)->reserve_header(¶m, + header, + RESERVE_HEADER_SIZE, + &hlen)) + { + ret= 1; + plugin_unlock(thd, plugin); + break; + } + plugin_unlock(thd, plugin); + if (hlen == 0) + continue; + if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) + { + ret= 1; + break; + } + } + unlock(); + return ret; +} + +int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, + String *packet, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, before_send_event, thd, + (¶m, (uchar *)packet->c_ptr(), + packet->length(), + log_file+dirname_length(log_file), log_pos)); + return ret; +} + +int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, + String *packet) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_send_event, thd, + (¶m, packet->c_ptr(), packet->length())); + return ret; +} + +int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) + +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m)); + return ret; +} + +void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, + Master_info *mi) +{ + param->mysql= mi->mysql; + param->user= mi->user; + param->host= mi->host; + param->port= mi->port; + param->master_log_name= mi->master_log_name; + param->master_log_pos= mi->master_log_pos; +} + +int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_start, thd, (¶m)); + return ret; +} + + +int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) +{ + + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_stop, thd, (¶m)); + return ret; +} + +int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, + Master_info *mi, + ushort flags) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, + ulong *event_len) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_read_event, thd, + (¶m, packet, len, event_buf, event_len)); + return ret; +} + +int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, + ulong event_len, + bool synced) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + int ret= 0; + FOREACH_OBSERVER(ret, after_queue_event, thd, + (¶m, event_buf, event_len, flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) + +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m)); + return ret; +} +#endif /* HAVE_REPLICATION */ + +int register_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +#ifdef HAVE_REPLICATION +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); +} +#endif /* HAVE_REPLICATION */ diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h new file mode 100644 index 00000000000..4fb7b4e035b --- /dev/null +++ b/sql/rpl_handler.h @@ -0,0 +1,213 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef RPL_HANDLER_H +#define RPL_HANDLER_H + +#include "mysql_priv.h" +#include "rpl_mi.h" +#include "rpl_rli.h" +#include "sql_plugin.h" +#include "replication.h" + +class Observer_info { +public: + void *observer; + st_plugin_int *plugin_int; + plugin_ref plugin; + + Observer_info(void *ob, st_plugin_int *p) + :observer(ob), plugin_int(p) + { + plugin= plugin_int_to_ref(plugin_int); + } +}; + +class Delegate { +public: + typedef List<Observer_info> Observer_info_list; + typedef List_iterator<Observer_info> Observer_info_iterator; + + int add_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (!info) + { + info= new Observer_info(observer, plugin); + if (!info || observer_info_list.push_back(info, &memroot)) + ret= TRUE; + } + else + ret= TRUE; + unlock(); + return ret; + } + + int remove_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (info) + iter.remove(); + else + ret= TRUE; + unlock(); + return ret; + } + + inline Observer_info_iterator observer_info_iter() + { + return Observer_info_iterator(observer_info_list); + } + + inline bool is_empty() + { + return observer_info_list.is_empty(); + } + + inline int read_lock() + { + if (!inited) + return TRUE; + return rw_rdlock(&lock); + } + + inline int write_lock() + { + if (!inited) + return TRUE; + return rw_wrlock(&lock); + } + + inline int unlock() + { + if (!inited) + return TRUE; + return rw_unlock(&lock); + } + + inline bool is_inited() + { + return inited; + } + + Delegate() + { + inited= FALSE; + if (my_rwlock_init(&lock, NULL)) + return; + init_sql_alloc(&memroot, 1024, 0); + inited= TRUE; + } + ~Delegate() + { + inited= FALSE; + rwlock_destroy(&lock); + free_root(&memroot, MYF(0)); + } + +private: + Observer_info_list observer_info_list; + rw_lock_t lock; + MEM_ROOT memroot; + bool inited; +}; + +class Trans_delegate + :public Delegate { +public: + typedef Trans_observer Observer; + int before_commit(THD *thd, bool all); + int before_rollback(THD *thd, bool all); + int after_commit(THD *thd, bool all); + int after_rollback(THD *thd, bool all); +}; + +class Binlog_storage_delegate + :public Delegate { +public: + typedef Binlog_storage_observer Observer; + int after_flush(THD *thd, const char *log_file, + my_off_t log_pos, bool synced); +}; + +#ifdef HAVE_REPLICATION +class Binlog_transmit_delegate + :public Delegate { +public: + typedef Binlog_transmit_observer Observer; + int transmit_start(THD *thd, ushort flags, + const char *log_file, my_off_t log_pos); + int transmit_stop(THD *thd, ushort flags); + int reserve_header(THD *thd, ushort flags, String *packet); + int before_send_event(THD *thd, ushort flags, + String *packet, const + char *log_file, my_off_t log_pos ); + int after_send_event(THD *thd, ushort flags, + String *packet); + int after_reset_master(THD *thd, ushort flags); +}; + +class Binlog_relay_IO_delegate + :public Delegate { +public: + typedef Binlog_relay_IO_observer Observer; + int thread_start(THD *thd, Master_info *mi); + int thread_stop(THD *thd, Master_info *mi); + int before_request_transmit(THD *thd, Master_info *mi, ushort flags); + int after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, ulong *event_len); + int after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, ulong event_len, + bool synced); + int after_reset_slave(THD *thd, Master_info *mi); +private: + void init_param(Binlog_relay_IO_param *param, Master_info *mi); +}; +#endif /* HAVE_REPLICATION */ + +int delegates_init(); +void delegates_destroy(); + +extern Trans_delegate *transaction_delegate; +extern Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +extern Binlog_transmit_delegate *binlog_transmit_delegate; +extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + if there is no observers in the delegate, we can return 0 + immediately. +*/ +#define RUN_HOOK(group, hook, args) \ + (group ##_delegate->is_empty() ? \ + 0 : group ##_delegate->hook args) + +#endif /* RPL_HANDLER_H */ diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 5e46837e948..e83e0ad0ba9 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -26,17 +26,21 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, const char *default_val); +int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val); +int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f); -Master_info::Master_info() +Master_info::Master_info(bool is_slave_recovery) :Slave_reporting_capability("I/O"), ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0), inited(0), - abort_slave(0),slave_running(0), - slave_run_id(0) + rli(is_slave_recovery), abort_slave(0), slave_running(0), + slave_run_id(0), sync_counter(0), + heartbeat_period(0), received_heartbeats(0), master_id(0) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; ssl_cipher[0]= 0; ssl_key[0]= 0; + my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16); bzero((char*) &file, sizeof(file)); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); @@ -47,6 +51,7 @@ Master_info::Master_info() Master_info::~Master_info() { + delete_dynamic(&ignore_server_ids); pthread_mutex_destroy(&run_lock); pthread_mutex_destroy(&data_lock); pthread_cond_destroy(&data_cond); @@ -54,6 +59,43 @@ Master_info::~Master_info() pthread_cond_destroy(&stop_cond); } +/** + A comparison function to be supplied as argument to @c sort_dynamic() + and @c bsearch() + + @return -1 if first argument is less, 0 if it equal to, 1 if it is greater + than the second +*/ +int change_master_server_id_cmp(ulong *id1, ulong *id2) +{ + return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0); +} + + +/** + Reports if the s_id server has been configured to ignore events + it generates with + + CHANGE MASTER IGNORE_SERVER_IDS= ( list of server ids ) + + Method is called from the io thread event receiver filtering. + + @param s_id the master server identifier + + @retval TRUE if s_id is in the list of ignored master servers, + @retval FALSE otherwise. + */ +bool Master_info::shall_ignore_server_id(ulong s_id) +{ + if (likely(ignore_server_ids.elements == 1)) + return (* (ulong*) dynamic_array_ptr(&ignore_server_ids, 0)) == s_id; + else + return bsearch((const ulong *) &s_id, + ignore_server_ids.buffer, + ignore_server_ids.elements, sizeof(ulong), + (int (*) (const void*, const void*)) change_master_server_id_cmp) + != NULL; +} void init_master_info_with_options(Master_info* mi) { @@ -84,6 +126,17 @@ void init_master_info_with_options(Master_info* mi) strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1); /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; + /* + always request heartbeat unless master_heartbeat_period is set + explicitly zero. Here is the default value for heartbeat period + if CHANGE MASTER did not specify it. (no data loss in conversion + as hb period has a max) + */ + mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD, + (slave_net_timeout/2.0)); + DBUG_ASSERT(mi->heartbeat_period > (float) 0.001 + || mi->heartbeat_period == 0); + DBUG_VOID_RETURN; } @@ -93,9 +146,12 @@ enum { /* 5.1.16 added value of master_ssl_verify_server_cert */ LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15, - + /* 6.0 added value of master_heartbeat_period */ + LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16, + /* 6.0 added value of master_ignore_server_id */ + LINE_FOR_REPLICATE_IGNORE_SERVER_IDS= 17, /* Number of lines currently used when saving master info file */ - LINES_IN_MASTER_INFO= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT + LINES_IN_MASTER_INFO= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS }; int init_master_info(Master_info* mi, const char* master_info_fname, @@ -197,6 +253,7 @@ file '%s')", fname); mi->fd = fd; int port, connect_retry, master_log_pos, lines; int ssl= 0, ssl_verify_server_cert= 0; + float master_heartbeat_period= 0.0; char *first_non_digit; /* @@ -281,7 +338,23 @@ file '%s')", fname); if (lines >= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT && init_intvar_from_file(&ssl_verify_server_cert, &mi->file, 0)) goto errwithmsg; - + /* + Starting from 6.0 master_heartbeat_period might be + in the file + */ + if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD && + init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0)) + goto errwithmsg; + /* + Starting from 6.0 list of server_id of ignorable servers might be + in the file + */ + if (lines >= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS && + init_dynarray_intvar_from_file(&mi->ignore_server_ids, &mi->file)) + { + sql_print_error("Failed to initialize master info ignore_server_ids"); + goto errwithmsg; + } } #ifndef HAVE_OPENSSL @@ -300,6 +373,7 @@ file '%s')", fname); mi->connect_retry= (uint) connect_retry; mi->ssl= (my_bool) ssl; mi->ssl_verify_server_cert= ssl_verify_server_cert; + mi->heartbeat_period= master_heartbeat_period; } DBUG_PRINT("master_info",("log_file_name: %s position: %ld", mi->master_log_name, @@ -310,6 +384,7 @@ file '%s')", fname); goto err; mi->inited = 1; + mi->rli.is_relay_log_recovery= FALSE; // now change cache READ -> WRITE - must do this before flush_master_info reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1); if ((error=test(flush_master_info(mi, 1)))) @@ -342,6 +417,7 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache) { IO_CACHE* file = &mi->file; char lbuf[22]; + int err= 0; DBUG_ENTER("flush_master_info"); DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos)); @@ -358,9 +434,35 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache) When we come to this place in code, relay log may or not be initialized; the caller is responsible for setting 'flush_relay_log_cache' accordingly. */ - if (flush_relay_log_cache && - flush_io_cache(mi->rli.relay_log.get_log_file())) - DBUG_RETURN(2); + if (flush_relay_log_cache) + { + IO_CACHE *log_file= mi->rli.relay_log.get_log_file(); + if (flush_io_cache(log_file)) + DBUG_RETURN(2); + } + + /* + produce a line listing the total number and all the ignored server_id:s + */ + char* ignore_server_ids_buf; + { + ignore_server_ids_buf= + (char *) my_malloc((sizeof(::server_id) * 3 + 1) * + (1 + mi->ignore_server_ids.elements), MYF(MY_WME)); + if (!ignore_server_ids_buf) + DBUG_RETURN(1); + for (ulong i= 0, cur_len= my_sprintf(ignore_server_ids_buf, + (ignore_server_ids_buf, "%u", + mi->ignore_server_ids.elements)); + i < mi->ignore_server_ids.elements; i++) + { + ulong s_id; + get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i); + cur_len +=my_sprintf(ignore_server_ids_buf + cur_len, + (ignore_server_ids_buf + cur_len, + " %lu", s_id)); + } + } /* We flushed the relay log BEFORE the master.info file, because if we crash @@ -378,17 +480,27 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache) contents of file). But because of number of lines in the first line of file we don't care about this garbage. */ - + char heartbeat_buf[sizeof(mi->heartbeat_period) * 4]; // buffer to suffice always + my_sprintf(heartbeat_buf, (heartbeat_buf, "%.3f", mi->heartbeat_period)); my_b_seek(file, 0L); my_b_printf(file, - "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n", + "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, mi->password, mi->port, mi->connect_retry, (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert, - mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert); - DBUG_RETURN(-flush_io_cache(file)); + mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, + heartbeat_buf, ignore_server_ids_buf); + my_free(ignore_server_ids_buf, MYF(0)); + err= flush_io_cache(file); + if (sync_masterinfo_period && !err && + ++(mi->sync_counter) >= sync_masterinfo_period) + { + err= my_sync(mi->fd, MYF(MY_WME)); + mi->sync_counter= 0; + } + DBUG_RETURN(-err); } diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 93fb0a98198..f822a6bc1b1 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -20,6 +20,7 @@ #include "rpl_rli.h" #include "rpl_reporting.h" +#include "my_sys.h" /***************************************************************************** @@ -58,8 +59,9 @@ class Master_info : public Slave_reporting_capability { public: - Master_info(); + Master_info(bool is_slave_recovery); ~Master_info(); + bool shall_ignore_server_id(ulong s_id); /* the variables below are needed because we can change masters on the fly */ char master_log_name[FN_REFLEN]; @@ -100,6 +102,16 @@ class Master_info : public Slave_reporting_capability */ long clock_diff_with_master; + /* + Keeps track of the number of events before fsyncing. + The option --sync-master-info determines how many + events should happen before fsyncing. + */ + uint sync_counter; + float heartbeat_period; // interface with CHANGE MASTER or master.info + ulonglong received_heartbeats; // counter of received heartbeat events + DYNAMIC_ARRAY ignore_server_ids; + ulong master_id; }; void init_master_info_with_options(Master_info* mi); @@ -109,6 +121,7 @@ int init_master_info(Master_info* mi, const char* master_info_fname, int thread_mask); void end_master_info(Master_info* mi); int flush_master_info(Master_info* mi, bool flush_relay_log_cache); +int change_master_server_id_cmp(ulong *id1, ulong *id2); #endif /* HAVE_REPLICATION */ #endif /* RPL_MI_H */ diff --git a/sql/rpl_record.cc b/sql/rpl_record.cc index 14a80cbb4b6..8e80620dd2c 100644 --- a/sql/rpl_record.cc +++ b/sql/rpl_record.cc @@ -180,7 +180,8 @@ int unpack_row(Relay_log_info const *rli, TABLE *table, uint const colcnt, uchar const *const row_data, MY_BITMAP const *cols, - uchar const **const row_end, ulong *const master_reclength) + uchar const **const row_end, ulong *const master_reclength, + const bool abort_on_warning, const bool first_row) { DBUG_ENTER("unpack_row"); DBUG_ASSERT(row_data); @@ -224,8 +225,35 @@ unpack_row(Relay_log_info const *rli, /* Field...::unpack() cannot return 0 */ DBUG_ASSERT(pack_ptr != NULL); - if ((null_bits & null_mask) && f->maybe_null()) - f->set_null(); + if (null_bits & null_mask) + { + if (f->maybe_null()) + { + DBUG_PRINT("debug", ("Was NULL; null mask: 0x%x; null bits: 0x%x", + null_mask, null_bits)); + f->set_null(); + } + else + { + MYSQL_ERROR::enum_warning_level error_type= + MYSQL_ERROR::WARN_LEVEL_NOTE; + if (abort_on_warning && (table->file->has_transactions() || + first_row)) + { + error = HA_ERR_ROWS_EVENT_APPLY; + error_type= MYSQL_ERROR::WARN_LEVEL_ERROR; + } + else + { + f->set_default(); + error_type= MYSQL_ERROR::WARN_LEVEL_WARN; + } + push_warning_printf(current_thd, error_type, + ER_BAD_NULL_ERROR, + ER(ER_BAD_NULL_ERROR), + f->field_name); + } + } else { f->set_notnull(); @@ -305,13 +333,17 @@ unpack_row(Relay_log_info const *rli, @param table Table whose record[0] buffer is prepared. @param skip Number of columns for which default/nullable check should be skipped. - @param check Indicates if errors should be raised when checking - default/nullable field properties. + @param check Specifies if lack of default error needs checking. + @param abort_on_warning + Controls how to react on lack of a field's default. + The parameter mimics the master side one for + @c check_that_all_fields_are_given_values. @returns 0 on success or a handler level error code */ int prepare_record(TABLE *const table, - const uint skip, const bool check) + const uint skip, const bool check, + const bool abort_on_warning, const bool first_row) { DBUG_ENTER("prepare_record"); @@ -326,17 +358,37 @@ int prepare_record(TABLE *const table, if (skip >= table->s->fields || !check) DBUG_RETURN(0); - /* Checking if exists default/nullable fields in the default values. */ - - for (Field **field_ptr= table->field+skip ; *field_ptr ; ++field_ptr) + /* + For fields the extra fields on the slave, we check if they have a default. + The check follows the same rules as the INSERT query without specifying an + explicit value for a field not having the explicit default + (@c check_that_all_fields_are_given_values()). + */ + for (Field **field_ptr= table->field+skip; *field_ptr; ++field_ptr) { uint32 const mask= NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG; Field *const f= *field_ptr; - - if (((f->flags & mask) == mask)) + if ((f->flags & NO_DEFAULT_VALUE_FLAG) && + (f->real_type() != MYSQL_TYPE_ENUM)) { - my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), f->field_name); - error = HA_ERR_ROWS_EVENT_APPLY; + + MYSQL_ERROR::enum_warning_level error_type= + MYSQL_ERROR::WARN_LEVEL_NOTE; + if (abort_on_warning && (table->file->has_transactions() || + first_row)) + { + error= HA_ERR_ROWS_EVENT_APPLY; + error_type= MYSQL_ERROR::WARN_LEVEL_ERROR; + } + else + { + f->set_default(); + error_type= MYSQL_ERROR::WARN_LEVEL_WARN; + } + push_warning_printf(current_thd, error_type, + ER_NO_DEFAULT_FOR_FIELD, + ER(ER_NO_DEFAULT_FOR_FIELD), + f->field_name); } } diff --git a/sql/rpl_record.h b/sql/rpl_record.h index f9e64f0ab1d..6e8838f82b3 100644 --- a/sql/rpl_record.h +++ b/sql/rpl_record.h @@ -27,10 +27,13 @@ size_t pack_row(TABLE* table, MY_BITMAP const* cols, int unpack_row(Relay_log_info const *rli, TABLE *table, uint const colcnt, uchar const *const row_data, MY_BITMAP const *cols, - uchar const **const row_end, ulong *const master_reclength); + uchar const **const row_end, ulong *const master_reclength, + const bool abort_on_warning= TRUE, const bool first_row= TRUE); // Fill table's record[0] with default values. -int prepare_record(TABLE *const, const uint =0, const bool =FALSE); +int prepare_record(TABLE *const table, const uint skip, const bool check, + const bool abort_on_warning= TRUE, + const bool first_row= TRUE); #endif #endif diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 18fbae9bb9d..ec3adc79ca2 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -28,11 +28,12 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, const char *default_val); - -Relay_log_info::Relay_log_info() +Relay_log_info::Relay_log_info(bool is_slave_recovery) :Slave_reporting_capability("SQL"), no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id), - info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), + info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), + sync_counter(0), is_relay_log_recovery(is_slave_recovery), + save_temporary_tables(0), #if HAVE_purify is_fake(FALSE), #endif @@ -258,6 +259,9 @@ Failed to open the existing relay log info file '%s' (errno %d)", rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos; rli->group_master_log_pos= master_log_pos; + if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg)) + goto err; + if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, @@ -289,7 +293,10 @@ Failed to open the existing relay log info file '%s' (errno %d)", */ reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); if ((error= flush_relay_log_info(rli))) - sql_print_error("Failed to flush relay log info file"); + { + msg= "Failed to flush relay log info file"; + goto err; + } if (count_relay_log_space(rli)) { msg="Error counting relay log space"; @@ -1188,7 +1195,6 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) */ thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; - last_event_start_time= 0; DBUG_VOID_RETURN; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 171778d9675..fd36d18adae 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -96,6 +96,19 @@ public: LOG_INFO linfo; IO_CACHE cache_buf,*cur_log; + /* + Keeps track of the number of transactions that commits + before fsyncing. The option --sync-relay-log-info determines + how many transactions should commit before fsyncing. + */ + uint sync_counter; + + /* + Identifies when the recovery process is going on. + See sql/slave.cc:init_recovery for further details. + */ + bool is_relay_log_recovery; + /* The following variables are safe to read any time */ /* IO_CACHE of the info file - set only during init or end */ @@ -267,7 +280,7 @@ public: char slave_patternload_file[FN_REFLEN]; size_t slave_patternload_file_size; - Relay_log_info(); + Relay_log_info(bool is_slave_recovery); ~Relay_log_info(); /* @@ -336,12 +349,10 @@ public: void clear_tables_to_lock(); /* - Used by row-based replication to detect that it should not stop at - this event, but give it a chance to send more events. The time - where the last event inside a group started is stored here. If the - variable is zero, we are not in a group (but may be in a - transaction). - */ + Used to defer stopping the SQL thread to give it a chance + to finish up the current group of events. + The timestamp is set and reset in @c sql_slave_killed(). + */ time_t last_event_start_time; /** diff --git a/sql/scheduler.h b/sql/scheduler.h index 46bbd300cbb..e7916031a27 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -1,3 +1,6 @@ +#ifndef SCHEDULER_INCLUDED +#define SCHEDULER_INCLUDED + /* Copyright (C) 2007 MySQL AB This program is free software; you can redistribute it and/or modify @@ -58,3 +61,5 @@ enum pool_command_op class thd_scheduler {}; + +#endif /* SCHEDULER_INCLUDED */ diff --git a/sql/set_var.cc b/sql/set_var.cc index 3a217066776..b20ea500eb5 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -1539,6 +1539,23 @@ static bool get_unsigned(THD *thd, set_var *var, ulonglong user_max, } +bool sys_var_uint_ptr::check(THD *thd, set_var *var) +{ + var->save_result.ulong_value= (ulong) var->value->val_uint(); + return 0; +} + +bool sys_var_uint_ptr::update(THD *thd, set_var *var) +{ + *value= (uint) var->save_result.ulong_value; + return 0; +} + +void sys_var_uint_ptr::set_default(THD *thd, enum_var_type type) +{ + *value= (uint) option_limits->def_value; +} + sys_var_long_ptr:: sys_var_long_ptr(sys_var_chain *chain, const char *name_arg, ulong *value_ptr_arg, sys_after_update_func after_update_arg) @@ -3059,6 +3076,15 @@ static bool set_option_autocommit(THD *thd, set_var *var) ulonglong org_options= thd->options; + /* + If we are setting AUTOCOMMIT=1 and it was not already 1, then we + need to commit any outstanding transactions. + */ + if (var->save_result.ulong_value != 0 && + (thd->options & OPTION_NOT_AUTOCOMMIT) && + ha_commit(thd)) + return 1; + if (var->save_result.ulong_value != 0) thd->options&= ~((sys_var_thd_bit*) var->var)->bit_flag; else @@ -3072,8 +3098,6 @@ static bool set_option_autocommit(THD *thd, set_var *var) thd->options&= ~(ulonglong) (OPTION_BEGIN | OPTION_KEEP_LOG); thd->transaction.all.modified_non_trans_table= FALSE; thd->server_status|= SERVER_STATUS_AUTOCOMMIT; - if (ha_commit(thd)) - return 1; } else { diff --git a/sql/set_var.h b/sql/set_var.h index 10e6e0f9c35..0202a15836d 100644 --- a/sql/set_var.h +++ b/sql/set_var.h @@ -1,3 +1,6 @@ +#ifndef SET_VAR_INCLUDED +#define SET_VAR_INCLUDED + /* Copyright (C) 2002-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -172,6 +175,27 @@ public: { return (uchar*) value; } }; +/** + Unsigned int system variable class + */ +class sys_var_uint_ptr :public sys_var +{ +public: + sys_var_uint_ptr(sys_var_chain *chain, const char *name_arg, + uint *value_ptr_arg, + sys_after_update_func after_update_arg= NULL) + :sys_var(name_arg, after_update_arg), + value(value_ptr_arg) + { chain_sys_var(chain); } + bool check(THD *thd, set_var *var); + bool update(THD *thd, set_var *var); + void set_default(THD *thd, enum_var_type type); + SHOW_TYPE show_type() { return SHOW_INT; } + uchar *value_ptr(THD *thd, enum_var_type type, LEX_STRING *base) + { return (uchar*) value; } +private: + uint *value; +}; /* A global ulong variable that is protected by LOCK_global_system_variables @@ -1449,3 +1473,5 @@ void free_key_cache(const char *name, KEY_CACHE *key_cache); bool process_key_caches(process_key_cache_t func); void delete_elements(I_List<NAMED_LIST> *list, void (*free_element)(const char*, uchar*)); + +#endif /* SET_VAR_INCLUDED */ diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index d9486f290a9..3ae0683568e 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -4620,7 +4620,7 @@ ER_USER_LIMIT_REACHED 42000 swe "Användare '%-.64s' har överskridit '%s' (nuvarande värde: %ld)" ER_SPECIFIC_ACCESS_DENIED_ERROR 42000 nla "Toegang geweigerd. U moet het %-.128s privilege hebben voor deze operatie" - eng "Access denied; you need the %-.128s privilege for this operation" + eng "Access denied; you need (at least one of) the %-.128s privilege(s) for this operation" ger "Kein Zugriff. Hierfür wird die Berechtigung %-.128s benötigt" ita "Accesso non consentito. Serve il privilegio %-.128s per questa operazione" por "Acesso negado. Você precisa o privilégio %-.128s para essa operação" @@ -6238,3 +6238,6 @@ ER_BINLOG_UNSAFE_SYSTEM_FUNCTION ER_MESSAGE_AND_STATEMENT eng "%s Statement: %s" + +ER_SLAVE_IGNORE_SERVER_IDS + eng "The requested server id %d clashes with the slave startup option --replicate-same-server-id" diff --git a/sql/slave.cc b/sql/slave.cc index fac9ee214c5..834ed8ed933 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -40,6 +40,7 @@ #include <errmsg.h> #include <mysqld_error.h> #include <mysys_err.h> +#include "rpl_handler.h" #ifdef HAVE_REPLICATION @@ -48,6 +49,10 @@ #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") #define MAX_SLAVE_RETRY_PAUSE 5 +/* + a parameter of sql_slave_killed() to defer the killed status +*/ +#define SLAVE_WAIT_GROUP_DONE 60 bool use_slave_mask = 0; MY_BITMAP slave_error_mask; char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; @@ -69,6 +74,8 @@ ulonglong relay_log_space_limit = 0; int disconnect_slave_event_count = 0, abort_slave_event_count = 0; int events_till_abort = -1; +static pthread_key(Master_info*, RPL_MASTER_INFO); + enum enum_slave_reconnect_actions { SLAVE_RECON_ACT_REG= 0, @@ -220,6 +227,7 @@ void unlock_slave_threads(Master_info* mi) int init_slave() { DBUG_ENTER("init_slave"); + int error= 0; /* This is called when mysqld starts. Before client connections are @@ -231,7 +239,10 @@ int init_slave() TODO: re-write this to interate through the list of files for multi-master */ - active_mi= new Master_info; + active_mi= new Master_info(relay_log_recovery); + + if (pthread_key_create(&RPL_MASTER_INFO, NULL)) + goto err; /* If --slave-skip-errors=... was not used, the string value for the @@ -250,6 +261,7 @@ int init_slave() if (!active_mi) { sql_print_error("Failed to allocate memory for the master info structure"); + error= 1; goto err; } @@ -257,6 +269,7 @@ int init_slave() !master_host, (SLAVE_IO | SLAVE_SQL))) { sql_print_error("Failed to initialize the master info structure"); + error= 1; goto err; } @@ -275,18 +288,69 @@ int init_slave() SLAVE_IO | SLAVE_SQL)) { sql_print_error("Failed to create slave threads"); + error= 1; goto err; } } - pthread_mutex_unlock(&LOCK_active_mi); - DBUG_RETURN(0); err: pthread_mutex_unlock(&LOCK_active_mi); - DBUG_RETURN(1); + DBUG_RETURN(error); } +/* + Updates the master info based on the information stored in the + relay info and ignores relay logs previously retrieved by the IO + thread, which thus starts fetching again based on to the + group_master_log_pos and group_master_log_name. Eventually, the old + relay logs will be purged by the normal purge mechanism. + + In the feature, we should improve this routine in order to avoid throwing + away logs that are safely stored in the disk. Note also that this recovery + routine relies on the correctness of the relay-log.info and only tolerates + coordinate problems in master.info. + + In this function, there is no need for a mutex as the caller + (i.e. init_slave) already has one acquired. + + Specifically, the following structures are updated: + + 1 - mi->master_log_pos <-- rli->group_master_log_pos + 2 - mi->master_log_name <-- rli->group_master_log_name + 3 - It moves the relay log to the new relay log file, by + rli->group_relay_log_pos <-- BIN_LOG_HEADER_SIZE; + rli->event_relay_log_pos <-- BIN_LOG_HEADER_SIZE; + rli->group_relay_log_name <-- rli->relay_log.get_log_fname(); + rli->event_relay_log_name <-- rli->relay_log.get_log_fname(); + + If there is an error, it returns (1), otherwise returns (0). + */ +int init_recovery(Master_info* mi, const char** errmsg) +{ + DBUG_ENTER("init_recovery"); + + Relay_log_info *rli= &mi->rli; + if (rli->group_master_log_name[0]) + { + mi->master_log_pos= max(BIN_LOG_HEADER_SIZE, + rli->group_master_log_pos); + strmake(mi->master_log_name, rli->group_master_log_name, + sizeof(mi->master_log_name)-1); + + sql_print_warning("Recovery from master pos %ld and file %s.", + (ulong) mi->master_log_pos, mi->master_log_name); + + strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(rli->group_relay_log_name)-1); + strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(mi->rli.event_relay_log_name)-1); + + rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; + } + DBUG_RETURN(0); +} + /** Convert slave skip errors bitmap into a printable string. */ @@ -730,44 +794,92 @@ static bool io_slave_killed(THD* thd, Master_info* mi) DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed); } +/** + The function analyzes a possible killed status and makes + a decision whether to accept it or not. + Normally upon accepting the sql thread goes to shutdown. + In the event of deffering decision @rli->last_event_start_time waiting + timer is set to force the killed status be accepted upon its expiration. + + @param thd pointer to a THD instance + @param rli pointer to Relay_log_info instance + @return TRUE the killed status is recognized, FALSE a possible killed + status is deferred. +*/ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) { + bool ret= FALSE; DBUG_ENTER("sql_slave_killed"); DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun if (abort_loop || thd->killed || rli->abort_slave) { - if (rli->abort_slave && rli->is_in_group() && - thd->transaction.all.modified_non_trans_table) - DBUG_RETURN(0); - /* - If we are in an unsafe situation (stopping could corrupt replication), - we give one minute to the slave SQL thread of grace before really - terminating, in the hope that it will be able to read more events and - the unsafe situation will soon be left. Note that this one minute starts - from the last time anything happened in the slave SQL thread. So it's - really one minute of idleness, we don't timeout if the slave SQL thread - is actively working. - */ - if (rli->last_event_start_time == 0) - DBUG_RETURN(1); - DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving " - "it some grace period")); - if (difftime(time(0), rli->last_event_start_time) > 60) + if (thd->transaction.all.modified_non_trans_table && rli->is_in_group()) { - rli->report(ERROR_LEVEL, 0, - "SQL thread had to stop in an unsafe situation, in " - "the middle of applying updates to a " - "non-transactional table without any primary key. " - "There is a risk of duplicate updates when the slave " - "SQL thread is restarted. Please check your tables' " - "contents after restart."); - DBUG_RETURN(1); + char msg_stopped[]= + "... The slave SQL is stopped, leaving the current group " + "of events unfinished with a non-transaction table changed. " + "If the group consists solely of Row-based events, you can try " + "restarting the slave with --slave-exec-mode=IDEMPOTENT, which " + "ignores duplicate key, key not found, and similar errors (see " + "documentation for details)."; + + if (rli->abort_slave) + { + DBUG_PRINT("info", ("Slave SQL thread is being stopped in the middle of" + " a group having updated a non-trans table, giving" + " it some grace period")); + + /* + Slave sql thread shutdown in face of unfinished group modified + Non-trans table is handled via a timer. The slave may eventually + give out to complete the current group and in that case there + might be issues at consequent slave restart, see the error message. + WL#2975 offers a robust solution requiring to store the last exectuted + event's coordinates along with the group's coordianates + instead of waiting with @c last_event_start_time the timer. + */ + + if (rli->last_event_start_time == 0) + rli->last_event_start_time= my_time(0); + ret= difftime(my_time(0), rli->last_event_start_time) <= + SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE; + + DBUG_EXECUTE_IF("stop_slave_middle_group", + DBUG_EXECUTE_IF("incomplete_group_in_relay_log", + ret= TRUE;);); // time is over + + if (ret == 0) + { + rli->report(WARNING_LEVEL, 0, + "slave SQL thread is being stopped in the middle " + "of applying of a group having updated a non-transaction " + "table; waiting for the group completion ... "); + } + else + { + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), msg_stopped); + } + } + else + { + ret= TRUE; + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR), + msg_stopped); + } + } + else + { + ret= TRUE; } } - DBUG_RETURN(0); + if (ret) + rli->last_event_start_time= 0; + + DBUG_RETURN(ret); } @@ -860,6 +972,126 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) DBUG_RETURN(1); } +int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val) +{ + char buf[16]; + DBUG_ENTER("init_floatvar_from_file"); + + + if (my_b_gets(f, buf, sizeof(buf))) + { + if (sscanf(buf, "%f", var) != 1) + DBUG_RETURN(1); + else + DBUG_RETURN(0); + } + else if (default_val != 0.0) + { + *var = default_val; + DBUG_RETURN(0); + } + DBUG_RETURN(1); +} + + +/** + A master info read method + + This function is called from @c init_master_info() along with + relatives to restore some of @c active_mi members. + Particularly, this function is responsible for restoring + IGNORE_SERVER_IDS list of servers whose events the slave is + going to ignore (to not log them in the relay log). + Items being read are supposed to be decimal output of values of a + type shorter or equal of @c long and separated by the single space. + + @param arr @c DYNAMIC_ARRAY pointer to storage for servers id + @param f @c IO_CACHE pointer to the source file + + @retval 0 All OK + @retval non-zero An error +*/ + +int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f) +{ + int ret= 0; + char buf[16 * (sizeof(long)*4 + 1)]; // static buffer to use most of times + char *buf_act= buf; // actual buffer can be dynamic if static is short + char *token, *last; + uint num_items; // number of items of `arr' + size_t read_size; + DBUG_ENTER("init_dynarray_intvar_from_file"); + + if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0) + { + return 0; // no line in master.info + } + if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n') + { + /* + short read happend; allocate sufficient memory and make the 2nd read + */ + char buf_work[(sizeof(long)*3 + 1)*16]; + memcpy(buf_work, buf, sizeof(buf_work)); + num_items= atoi(strtok_r(buf_work, " ", &last)); + size_t snd_size; + /* + max size lower bound approximate estimation bases on the formula: + (the items number + items themselves) * + (decimal size + space) - 1 + `\n' + '\0' + */ + size_t max_size= (1 + num_items) * (sizeof(long)*3 + 1) + 1; + buf_act= (char*) my_malloc(max_size, MYF(MY_WME)); + memcpy(buf_act, buf, read_size); + snd_size= my_b_gets(f, buf_act + read_size, max_size - read_size); + if (snd_size == 0 || + (snd_size + 1 == max_size - read_size) && buf[max_size - 2] != '\n') + { + /* + failure to make the 2nd read or short read again + */ + ret= 1; + goto err; + } + } + token= strtok_r(buf_act, " ", &last); + if (token == NULL) + { + ret= 1; + goto err; + } + num_items= atoi(token); + for (uint i=0; i < num_items; i++) + { + token= strtok_r(NULL, " ", &last); + if (token == NULL) + { + ret= 1; + goto err; + } + else + { + ulong val= atol(token); + insert_dynamic(arr, (uchar *) &val); + } + } +err: + if (buf_act != buf) + my_free(buf_act, MYF(0)); + DBUG_RETURN(ret); +} + + +static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) +{ + if (io_slave_killed(thd, mi)) + { + if (info && global_system_variables.log_warnings) + sql_print_information(info); + return TRUE; + } + return FALSE; +} /* Check if the error is caused by network. @@ -1028,7 +1260,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { - if ((::server_id == strtoul(master_row[1], 0, 10)) && + if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) && !mi->rli.replicate_same_server_id) { errmsg= "The slave I/O thread stops because master and slave have equal \ @@ -1066,6 +1298,13 @@ maybe it is a *VERY OLD MASTER*."); mysql_free_result(master_res); master_res= NULL; } + if (mi->master_id == 0 && mi->ignore_server_ids.elements > 0) + { + errmsg= "Slave configured with server id filtering could not detect the master server id."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER(err_code), errmsg); + goto err; + } /* Check that the master's global character_set_server and ours are the same. @@ -1189,6 +1428,31 @@ when it try to get the value of TIME_ZONE global variable from master."; } } + if (mi->heartbeat_period != 0.0) + { + char llbuf[22]; + const char query_format[]= "SET @master_heartbeat_period= %s"; + char query[sizeof(query_format) - 2 + sizeof(llbuf)]; + /* + the period is an ulonglong of nano-secs. + */ + llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf); + my_sprintf(query, (query, query_format, llbuf)); + + if (mysql_real_query(mysql, query, strlen(query)) + && !check_io_slave_killed(mi->io_thd, mi, NULL)) + { + errmsg= "The slave I/O thread stops because SET @master_heartbeat_period " + "on master failed."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + mysql_free_result(mysql_store_result(mysql)); + } + + err: if (errmsg) { @@ -1604,6 +1868,10 @@ bool show_master_info(THD* thd, Master_info* mi) field_list.push_back(new Item_empty_string("Last_IO_Error", 20)); field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Last_SQL_Error", 20)); + field_list.push_back(new Item_empty_string("Replicate_Ignore_Server_Ids", + FN_REFLEN)); + field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong), + MYSQL_TYPE_LONG)); if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) @@ -1639,7 +1907,8 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store((ulonglong) mi->rli.group_relay_log_pos); protocol->store(mi->rli.group_master_log_name, &my_charset_bin); protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ? - "Yes" : "No", &my_charset_bin); + "Yes" : (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT ? + "Connecting" : "No"), &my_charset_bin); protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); protocol->store(rpl_filter->get_do_db()); protocol->store(rpl_filter->get_ignore_db()); @@ -1725,6 +1994,32 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store(mi->rli.last_error().number); // Last_SQL_Error protocol->store(mi->rli.last_error().message, &my_charset_bin); + // Replicate_Ignore_Server_Ids + { + char buff[FN_REFLEN]; + ulong i, cur_len; + for (i= 0, buff[0]= 0, cur_len= 0; + i < mi->ignore_server_ids.elements; i++) + { + ulong s_id, slen; + char sbuff[FN_REFLEN]; + get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i); + slen= my_sprintf(sbuff, (sbuff, (i==0? "%lu" : ", %lu"), s_id)); + if (cur_len + slen + 4 > FN_REFLEN) + { + /* + break the loop whenever remained space could not fit + ellipses on the next cycle + */ + my_sprintf(buff + cur_len, (buff + cur_len, "...")); + break; + } + cur_len += my_sprintf(buff + cur_len, (buff + cur_len, "%s", sbuff)); + } + protocol->store(buff, &my_charset_bin); + } + // Master_Server_id + protocol->store((uint32) mi->master_id); pthread_mutex_unlock(&mi->rli.err_lock); pthread_mutex_unlock(&mi->err_lock); @@ -1868,17 +2163,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, } -static int request_dump(MYSQL* mysql, Master_info* mi, - bool *suppress_warnings) +static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, + bool *suppress_warnings) { uchar buf[FN_REFLEN + 10]; int len; - int binlog_flags = 0; // for now + ushort binlog_flags = 0; // for now char* logname = mi->master_log_name; DBUG_ENTER("request_dump"); *suppress_warnings= FALSE; + if (RUN_HOOK(binlog_relay_io, + before_request_transmit, + (thd, mi, binlog_flags))) + DBUG_RETURN(1); + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -2278,6 +2578,27 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) delete ev; DBUG_RETURN(1); } + + { /** + The following failure injecion works in cooperation with tests + setting @@global.debug= 'd,incomplete_group_in_relay_log'. + Xid or Commit events are not executed to force the slave sql + read hanging if the realy log does not have any more events. + */ + DBUG_EXECUTE_IF("incomplete_group_in_relay_log", + if ((ev->get_type_code() == XID_EVENT) || + ((ev->get_type_code() == QUERY_EVENT) && + strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) + { + DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); + rli->abort_slave= 1; + pthread_mutex_unlock(&rli->data_lock); + delete ev; + rli->inc_event_relay_log_pos(); + DBUG_RETURN(0); + };); + } + exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE); /* @@ -2381,18 +2702,6 @@ on this slave.\ } -static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) -{ - if (io_slave_killed(thd, mi)) - { - if (info && global_system_variables.log_warnings) - sql_print_information(info); - return TRUE; - } - return FALSE; -} - - /** @brief Try to reconnect slave IO thread. @@ -2532,6 +2841,16 @@ pthread_handler_t handle_slave_io(void *arg) mi->master_log_name, llstr(mi->master_log_pos,llbuff))); + /* This must be called before run any binlog_relay_io hooks */ + my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi); + + if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook"); + goto err; + } + if (!(mi->mysql = mysql = mysql_init(NULL))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, @@ -2621,7 +2940,7 @@ connected: while (!io_slave_killed(thd,mi)) { thd_proc_info(thd, "Requesting binlog dump"); - if (request_dump(mysql, mi, &suppress_warnings)) + if (request_dump(thd, mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ @@ -2641,6 +2960,7 @@ requesting master dump") || goto err; goto connected; }); + const char *event_buf; DBUG_ASSERT(mi->last_error().number == 0); while (!io_slave_killed(thd,mi)) @@ -2697,14 +3017,37 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter thd_proc_info(thd, "Queueing master event to the relay log"); - if (queue_event(mi,(const char*)mysql->net.read_pos + 1, - event_len)) + event_buf= (const char*)mysql->net.read_pos + 1; + if (RUN_HOOK(binlog_relay_io, after_read_event, + (thd, mi,(const char*)mysql->net.read_pos + 1, + event_len, &event_buf, &event_len))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_read_event' hook"); + goto err; + } + + /* XXX: 'synced' should be updated by queue_event to indicate + whether event has been synced to disk */ + bool synced= 0; + if (queue_event(mi, event_buf, event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "could not queue event from master"); goto err; } + + if (RUN_HOOK(binlog_relay_io, after_queue_event, + (thd, mi, event_buf, event_len, synced))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_queue_event' hook"); + goto err; + } + if (flush_master_info(mi, 1)) { sql_print_error("Failed to flush master info file"); @@ -2750,6 +3093,7 @@ err: // print the current replication position sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); thd->set_query(NULL, 0); thd->reset_db(NULL, 0); if (mysql) @@ -3552,9 +3896,11 @@ static int queue_old_event(Master_info *mi, const char *buf, static int queue_event(Master_info* mi,const char* buf, ulong event_len) { int error= 0; + String error_msg; ulong inc_pos; Relay_log_info *rli= &mi->rli; pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); + ulong s_id; DBUG_ENTER("queue_event"); LINT_INIT(inc_pos); @@ -3586,7 +3932,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); if (unlikely(process_io_rotate(mi,&rev))) { - error= 1; + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; goto err; } /* @@ -3613,7 +3959,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Log_event::read_log_event(buf, event_len, &errmsg, mi->rli.relay_log.description_event_for_queue))) { - error= 2; + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; goto err; } delete mi->rli.relay_log.description_event_for_queue; @@ -3632,6 +3978,56 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + + case HEARTBEAT_LOG_EVENT: + { + /* + HB (heartbeat) cannot come before RL (Relay) + */ + char llbuf[22]; + Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue); + if (!hb.is_valid()) + { + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + llstr(hb.log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + mi->received_heartbeats++; + /* + compare local and event's versions of log_file, log_pos. + + Heartbeat is sent only after an event corresponding to the corrdinates + the heartbeat carries. + Slave can not have a difference in coordinates except in the only + special case when mi->master_log_name, master_log_pos have never + been updated by Rotate event i.e when slave does not have any history + with the master (and thereafter mi->master_log_pos is NULL). + + TODO: handling `when' for SHOW SLAVE STATUS' snds behind + */ + if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) + && mi->master_log_name != NULL) + || mi->master_log_pos != hb.log_pos) + { + /* missed events of heartbeat from the past */ + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + llstr(hb.log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + goto skip_relay_logging; + } + break; + default: inc_pos= event_len; break; @@ -3651,9 +4047,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ pthread_mutex_lock(log_lock); - - if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) && - !mi->rli.replicate_same_server_id) + s_id= uint4korr(buf + SERVER_ID_OFFSET); + if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) || + /* + the following conjunction deals with IGNORE_SERVER_IDS, if set + If the master is on the ignore list, execution of + format description log events and rotate events is necessary. + */ + (mi->ignore_server_ids.elements > 0 && + mi->shall_ignore_server_id(s_id) && + /* everything is filtered out from non-master */ + (s_id != mi->master_id || + /* for the master meta information is necessary */ + buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) { /* Do not write it to the relay log. @@ -3668,10 +4075,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) But events which were generated by this slave and which do not exist in the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment mi->master_log_pos. + If the event is originated remotely and is being filtered out by + IGNORE_SERVER_IDS it increments mi->master_log_pos + as well as rli->group_relay_log_pos. */ - if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) + if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) || + buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && + buf[EVENT_TYPE_OFFSET] != STOP_EVENT) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); @@ -3679,8 +4090,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rli->ign_master_log_pos_end= mi->master_log_pos; } rli->relay_log.signal_update(); // the slave SQL thread needs to re-check - DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored", - (ulong) mi->master_log_pos)); + DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", + (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET))); } else { @@ -3692,15 +4103,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rli->relay_log.harvest_bytes_written(&rli->log_space_total); } else - error= 3; + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + } rli->ign_master_log_name_end[0]= 0; // last event is not ignored } pthread_mutex_unlock(log_lock); - +skip_relay_logging: + err: pthread_mutex_unlock(&mi->data_lock); DBUG_PRINT("info", ("error: %d", error)); + if (error) + mi->report(ERROR_LEVEL, error, ER(error), + (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)? + "could not queue event from master" : + error_msg.ptr()); DBUG_RETURN(error); } @@ -3906,6 +4325,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, } +MYSQL *rpl_connect_master(MYSQL *mysql) +{ + THD *thd= current_thd; + Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO); + if (!mi) + { + sql_print_error("'rpl_connect_master' must be called in slave I/O thread context."); + return NULL; + } + + bool allocated= false; + + if (!mysql) + { + if(!(mysql= mysql_init(NULL))) + { + sql_print_error("rpl_connect_master: failed in mysql_init()"); + return NULL; + } + allocated= true; + } + + /* + XXX: copied from connect_to_master, this function should not + change the slave status, so we cannot use connect_to_master + directly + + TODO: make this part a seperate function to eliminate duplication + */ + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + +#ifdef HAVE_OPENSSL + if (mi->ssl) + { + mysql_ssl_set(mysql, + mi->ssl_key[0]?mi->ssl_key:0, + mi->ssl_cert[0]?mi->ssl_cert:0, + mi->ssl_ca[0]?mi->ssl_ca:0, + mi->ssl_capath[0]?mi->ssl_capath:0, + mi->ssl_cipher[0]?mi->ssl_cipher:0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); + } +#endif + + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + /* This one is not strictly needed but we have it here for completeness */ + mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); + + if (io_slave_killed(thd, mi) + || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + if (!io_slave_killed(thd, mi)) + sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)", + mysql_error(mysql), mysql_errno(mysql)); + + if (allocated) + mysql_close(mysql); // this will free the object + return NULL; + } + return mysql; +} + /* Store the file and position where the execute-slave thread are in the relay log. @@ -3959,7 +4443,14 @@ bool flush_relay_log_info(Relay_log_info* rli) error=1; if (flush_io_cache(file)) error=1; - + if (sync_relayloginfo_period && + !error && + ++(rli->sync_counter) >= sync_relayloginfo_period) + { + if (my_sync(rli->info_fd, MYF(MY_WME))) + error=1; + rli->sync_counter= 0; + } /* Flushing the relay log is done by the slave I/O thread */ DBUG_RETURN(error); } @@ -4208,8 +4699,8 @@ static Log_event* next_event(Relay_log_info* rli) */ pthread_mutex_unlock(&rli->log_space_lock); pthread_cond_broadcast(&rli->log_space_cond); - // Note that wait_for_update unlocks lock_log ! - rli->relay_log.wait_for_update(rli->sql_thd, 1); + // Note that wait_for_update_relay_log unlocks lock_log ! + rli->relay_log.wait_for_update_relay_log(rli->sql_thd); // re-acquire data lock since we released it earlier pthread_mutex_lock(&rli->data_lock); rli->last_master_timestamp= save_timestamp; @@ -4366,6 +4857,8 @@ void rotate_relay_log(Master_info* mi) DBUG_ENTER("rotate_relay_log"); Relay_log_info* rli= &mi->rli; + DBUG_EXECUTE_IF("crash_before_rotate_relaylog", abort();); + /* We don't lock rli->run_lock. This would lead to deadlocks. */ pthread_mutex_lock(&mi->run_lock); diff --git a/sql/slave.h b/sql/slave.h index a44a7eed83e..48213316b98 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -22,6 +22,17 @@ @file */ + +/** + Some of defines are need in parser even though replication is not + compiled in (embedded). +*/ + +/** + The maximum is defined as (ULONG_MAX/1000) with 4 bytes ulong +*/ +#define SLAVE_MAX_HEARTBEAT_PERIOD 4294967 + #ifdef HAVE_REPLICATION #include "log.h" @@ -33,7 +44,6 @@ #define MAX_SLAVE_ERROR 2000 - // Forward declarations class Relay_log_info; class Master_info; @@ -134,6 +144,7 @@ extern ulonglong relay_log_space_limit; #define SLAVE_FORCE_ALL 4 int init_slave(); +int init_recovery(Master_info* mi, const char** errmsg); void init_slave_skip_errors(const char* arg); bool flush_relay_log_info(Relay_log_info* rli); int register_slave_on_master(MYSQL* mysql); diff --git a/sql/sp_head.cc b/sql/sp_head.cc index bb871a41846..ef5dc8d3351 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -174,6 +174,7 @@ sp_get_flags_for_command(LEX *lex) case SQLCOM_SHOW_AUTHORS: case SQLCOM_SHOW_BINLOGS: case SQLCOM_SHOW_BINLOG_EVENTS: + case SQLCOM_SHOW_RELAYLOG_EVENTS: case SQLCOM_SHOW_CHARSETS: case SQLCOM_SHOW_COLLATIONS: case SQLCOM_SHOW_COLUMN_TYPES: diff --git a/sql/sql_acl.h b/sql/sql_acl.h index a8090fba2e7..c0622bd747c 100644 --- a/sql/sql_acl.h +++ b/sql/sql_acl.h @@ -1,3 +1,6 @@ +#ifndef SQL_ACL_INCLUDED +#define SQL_ACL_INCLUDED + /* Copyright (C) 2000-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -273,3 +276,4 @@ bool is_acl_user(const char *host, const char *user); #define check_grant(A,B,C,D,E,F) 0 #define check_grant_db(A,B) 0 #endif +#endif /* SQL_ACL_INCLUDED */ diff --git a/sql/sql_analyse.h b/sql/sql_analyse.h index 8807b40857e..8f52b90c874 100644 --- a/sql/sql_analyse.h +++ b/sql/sql_analyse.h @@ -1,3 +1,6 @@ +#ifndef SQL_ANALYSE_INCLUDED +#define SQL_ANALYSE_INCLUDED + /* Copyright (C) 2000-2003, 2005 MySQL AB This program is free software; you can redistribute it and/or modify @@ -355,3 +358,5 @@ public: select_result *result, List<Item> &field_list); }; + +#endif /* SQL_ANALYSE_INCLUDED */ diff --git a/sql/sql_array.h b/sql/sql_array.h index e1b22921519..dfaa9b02947 100644 --- a/sql/sql_array.h +++ b/sql/sql_array.h @@ -1,3 +1,6 @@ +#ifndef SQL_ARRAY_INCLUDED +#define SQL_ARRAY_INCLUDED + /* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify @@ -66,3 +69,4 @@ public: } }; +#endif /* SQL_ARRAY_INCLUDED */ diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 96e99b57e3c..531242f64d1 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -58,7 +58,7 @@ void mysql_client_binlog_statement(THD* thd) my_bool have_fd_event= TRUE; if (!thd->rli_fake) { - thd->rli_fake= new Relay_log_info; + thd->rli_fake= new Relay_log_info(FALSE); #ifdef HAVE_purify thd->rli_fake->is_fake= TRUE; #endif diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a59ae99a16d..2301dec5b51 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -279,6 +279,42 @@ const char *set_thd_proc_info(THD *thd, const char *info, } extern "C" +const char* thd_enter_cond(MYSQL_THD thd, pthread_cond_t *cond, + pthread_mutex_t *mutex, const char *msg) +{ + if (!thd) + thd= current_thd; + + const char* old_msg = thd->proc_info; + safe_mutex_assert_owner(mutex); + thd->mysys_var->current_mutex = mutex; + thd->mysys_var->current_cond = cond; + thd->proc_info = msg; + return old_msg; +} + +extern "C" +void thd_exit_cond(MYSQL_THD thd, const char *old_msg) +{ + if (!thd) + thd= current_thd; + + /* + Putting the mutex unlock in thd_exit_cond() ensures that + mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is + locked (if that would not be the case, you'll get a deadlock if someone + does a THD::awake() on you). + */ + pthread_mutex_unlock(thd->mysys_var->current_mutex); + pthread_mutex_lock(&thd->mysys_var->mutex); + thd->mysys_var->current_mutex = 0; + thd->mysys_var->current_cond = 0; + thd->proc_info = old_msg; + pthread_mutex_unlock(&thd->mysys_var->mutex); + return; +} + +extern "C" void **thd_ha_data(const THD *thd, const struct handlerton *hton) { return (void **) &thd->ha_data[hton->slot].ha_ptr; @@ -587,9 +623,6 @@ THD::THD() limit_found_rows= 0; row_count_func= -1; statement_id_counter= 0UL; -#ifdef ERROR_INJECT_SUPPORT - error_inject_value= 0UL; -#endif // Must be reset to handle error with THD's created for init of mysqld lex->current_select= 0; start_time=(time_t) 0; diff --git a/sql/sql_class.h b/sql/sql_class.h index ad047514ae5..521884f3f4d 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -14,6 +14,9 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifndef SQL_CLASS_INCLUDED +#define SQL_CLASS_INCLUDED + /* Classes in mysql */ #ifdef USE_PRAGMA_INTERFACE @@ -22,6 +25,7 @@ #include "log.h" #include "rpl_tblmap.h" +#include "replication.h" /** An interface that is used to take an action when @@ -1759,9 +1763,6 @@ public: query_id_t query_id, warn_id; ulong col_access; -#ifdef ERROR_INJECT_SUPPORT - ulong error_inject_value; -#endif /* Statement id is thread-wide. This counter is used to generate ids */ ulong statement_id_counter; ulong rand_saved_seed1, rand_saved_seed2; @@ -1971,27 +1972,11 @@ public: inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex, const char* msg) { - const char* old_msg = proc_info; - safe_mutex_assert_owner(mutex); - mysys_var->current_mutex = mutex; - mysys_var->current_cond = cond; - proc_info = msg; - return old_msg; + return thd_enter_cond(this, cond, mutex, msg); } inline void exit_cond(const char* old_msg) { - /* - Putting the mutex unlock in exit_cond() ensures that - mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is - locked (if that would not be the case, you'll get a deadlock if someone - does a THD::awake() on you). - */ - pthread_mutex_unlock(mysys_var->current_mutex); - pthread_mutex_lock(&mysys_var->mutex); - mysys_var->current_mutex = 0; - mysys_var->current_cond = 0; - proc_info = old_msg; - pthread_mutex_unlock(&mysys_var->mutex); + thd_exit_cond(this, old_msg); } inline time_t query_start() { query_start_used=1; return start_time; } inline void set_time() @@ -3058,3 +3043,4 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var, void mark_transaction_to_rollback(THD *thd, bool all); #endif /* MYSQL_SERVER */ +#endif /* SQL_CLASS_INCLUDED */ diff --git a/sql/sql_crypt.h b/sql/sql_crypt.h index a5a6bee8a58..8d5a761cbdf 100644 --- a/sql/sql_crypt.h +++ b/sql/sql_crypt.h @@ -1,3 +1,6 @@ +#ifndef SQL_CRYPT_INCLUDED +#define SQL_CRYPT_INCLUDED + /* Copyright (C) 2000-2001, 2005 MySQL AB This program is free software; you can redistribute it and/or modify @@ -35,3 +38,5 @@ class SQL_CRYPT :public Sql_alloc void encode(char *str, uint length); void decode(char *str, uint length); }; + +#endif /* SQL_CRYPT_INCLUDED */ diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 31bfc085b4c..db4b8701635 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -17,6 +17,9 @@ @defgroup Semantic_Analysis Semantic Analysis */ +#ifndef SQL_LEX_INCLUDED +#define SQL_LEX_INCLUDED + /* YACC and LEX Definitions */ /* These may not be declared yet */ @@ -118,7 +121,7 @@ enum enum_sql_command { SQLCOM_SHOW_CREATE_TRIGGER, SQLCOM_ALTER_DB_UPGRADE, SQLCOM_SHOW_PROFILE, SQLCOM_SHOW_PROFILES, - + SQLCOM_SHOW_RELAYLOG_EVENTS, /* When a command is added here, be sure it's also added in mysqld.cc in "struct show_var_st status_vars[]= {" ... @@ -203,17 +206,19 @@ typedef struct st_lex_master_info { char *host, *user, *password, *log_file_name; uint port, connect_retry; + float heartbeat_period; ulonglong pos; ulong server_id; /* Enum is used for making it possible to detect if the user changed variable or if it should be left at old value */ - enum {SSL_UNCHANGED, SSL_DISABLE, SSL_ENABLE} - ssl, ssl_verify_server_cert; + enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} + ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt; char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher; char *relay_log_name; ulong relay_log_pos; + DYNAMIC_ARRAY repl_ignore_server_ids; } LEX_MASTER_INFO; @@ -2125,3 +2130,4 @@ extern bool is_lex_native_function(const LEX_STRING *name); int my_missing_function_error(const LEX_STRING &token, const char *name); #endif /* MYSQL_SERVER */ +#endif /* SQL_LEX_INCLUDED */ diff --git a/sql/sql_map.h b/sql/sql_map.h index a1efba0da6f..5ae260841e0 100644 --- a/sql/sql_map.h +++ b/sql/sql_map.h @@ -1,3 +1,6 @@ +#ifndef SQL_MAP_INCLUDED +#define SQL_MAP_INCLUDED + /* Copyright (C) 2000-2001, 2005 MySQL AB This program is free software; you can redistribute it and/or modify @@ -60,3 +63,5 @@ public: return file->map; } }; + +#endif /* SQL_MAP_INCLUDED */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 8dc3195044e..6eb2b1879c1 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -21,6 +21,7 @@ #include <m_ctype.h> #include <myisam.h> #include <my_dir.h> +#include "rpl_handler.h" #include "sp_head.h" #include "sp.h" @@ -1615,9 +1616,9 @@ void log_slow_statement(THD *thd) /* Do not log administrative statements unless the appropriate option is - set; do not log into slow log if reading from backup. + set. */ - if (thd->enable_slow_log && !thd->user_time) + if (thd->enable_slow_log) { ulonglong end_utime_of_query= thd->current_utime(); thd_proc_info(thd, "logging slow query"); @@ -2319,6 +2320,7 @@ mysql_execute_command(THD *thd) res = show_slave_hosts(thd); break; } + case SQLCOM_SHOW_RELAYLOG_EVENTS: /* fall through */ case SQLCOM_SHOW_BINLOG_EVENTS: { if (check_global_access(thd, REPL_SLAVE_ACL)) diff --git a/sql/sql_partition.cc b/sql/sql_partition.cc index 08ff2daacb9..a4c0f02e480 100644 --- a/sql/sql_partition.cc +++ b/sql/sql_partition.cc @@ -40,6 +40,10 @@ #ifdef WITH_PARTITION_STORAGE_ENGINE #include "ha_partition.h" + +#define ERROR_INJECT_CRASH(code) \ + DBUG_EVALUATE_IF(code, (abort(), 0), 0) + /* Partition related functions declarations and some static constants; */ diff --git a/sql/sql_partition.h b/sql/sql_partition.h index 282e24f1853..0c47340016c 100644 --- a/sql/sql_partition.h +++ b/sql/sql_partition.h @@ -1,3 +1,6 @@ +#ifndef SQL_PARTITION_INCLUDED +#define SQL_PARTITION_INCLUDED + /* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -209,3 +212,4 @@ typedef int (*get_partitions_in_range_iter)(partition_info *part_info, #include "partition_info.h" +#endif /* SQL_PARTITION_INCLUDED */ diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc index da168d36429..8cf8c4cb81f 100644 --- a/sql/sql_plugin.cc +++ b/sql/sql_plugin.cc @@ -19,14 +19,6 @@ #define REPORT_TO_LOG 1 #define REPORT_TO_USER 2 -#ifdef DBUG_OFF -#define plugin_ref_to_int(A) A -#define plugin_int_to_ref(A) A -#else -#define plugin_ref_to_int(A) (A ? A[0] : NULL) -#define plugin_int_to_ref(A) &(A) -#endif - extern struct st_mysql_plugin *mysqld_builtins[]; /** @@ -54,7 +46,8 @@ const LEX_STRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]= { C_STRING_WITH_LEN("STORAGE ENGINE") }, { C_STRING_WITH_LEN("FTPARSER") }, { C_STRING_WITH_LEN("DAEMON") }, - { C_STRING_WITH_LEN("INFORMATION SCHEMA") } + { C_STRING_WITH_LEN("INFORMATION SCHEMA") }, + { C_STRING_WITH_LEN("REPLICATION") }, }; extern int initialize_schema_table(st_plugin_int *plugin); @@ -93,7 +86,8 @@ static int min_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION, - MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION + MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION, + MYSQL_REPLICATION_INTERFACE_VERSION, }; static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= { @@ -101,7 +95,8 @@ static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= MYSQL_HANDLERTON_INTERFACE_VERSION, MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION, - MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION + MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION, + MYSQL_REPLICATION_INTERFACE_VERSION, }; static bool initialized= 0; diff --git a/sql/sql_plugin.h b/sql/sql_plugin.h index 004d0d5abb7..c6ad846943c 100644 --- a/sql/sql_plugin.h +++ b/sql/sql_plugin.h @@ -18,6 +18,14 @@ class sys_var; +#ifdef DBUG_OFF +#define plugin_ref_to_int(A) A +#define plugin_int_to_ref(A) A +#else +#define plugin_ref_to_int(A) (A ? A[0] : NULL) +#define plugin_int_to_ref(A) &(A) +#endif + /* the following flags are valid for plugin_init() */ diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 5c18ff7690c..5c25d789d5c 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -21,6 +21,7 @@ #include "log_event.h" #include "rpl_filter.h" #include <my_dir.h> +#include "rpl_handler.h" int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; @@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, DBUG_RETURN(0); } +/* + Reset thread transmit packet buffer for event sending + + This function allocates header bytes for event transmission, and + should be called before store the event data to the packet buffer. +*/ +static int reset_transmit_packet(THD *thd, ushort flags, + ulong *ev_offset, const char **errmsg) +{ + int ret= 0; + String *packet= &thd->packet; + + /* reserve and set default header */ + packet->length(0); + packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) + { + *errmsg= "Failed to run hook 'reserve_header'"; + my_errno= ER_UNKNOWN_ERROR; + ret= 1; + } + *ev_offset= packet->length(); + return ret; +} + static int send_file(THD *thd) { NET* net = &thd->net; @@ -336,6 +363,73 @@ Increase max_allowed_packet on master"; } +/** + An auxiliary function for calling in mysql_binlog_send + to initialize the heartbeat timeout in waiting for a binlogged event. + + @param[in] thd THD to access a user variable + + @return heartbeat period an ulonglong of nanoseconds + or zero if heartbeat was not demanded by slave +*/ +static ulonglong get_heartbeat_period(THD * thd) +{ + my_bool null_value; + LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")}; + user_var_entry *entry= + (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry? entry->val_int(&null_value) : 0; +} + +/* + Function prepares and sends repliation heartbeat event. + + @param net net object of THD + @param packet buffer to store the heartbeat instance + @param event_coordinates binlog file name and position of the last + real event master sent from binlog + + @note + Among three essential pieces of heartbeat data Log_event::when + is computed locally. + The error to send is serious and should force terminating + the dump thread. +*/ +static int send_heartbeat_event(NET* net, String* packet, + const struct event_coordinates *coord) +{ + DBUG_ENTER("send_heartbeat_event"); + char header[LOG_EVENT_HEADER_LEN]; + /* + 'when' (the timestamp) is set to 0 so that slave could distinguish between + real and fake Rotate events (if necessary) + */ + memset(header, 0, 4); // when + + header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT; + + char* p= coord->file_name + dirname_length(coord->file_name); + + uint ident_len = strlen(p); + ulong event_len = ident_len + LOG_EVENT_HEADER_LEN; + int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, 0); + + int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos + + packet->append(header, sizeof(header)); + packet->append(p, ident_len); // log_file_name + + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) || + net_flush(net)) + { + DBUG_RETURN(-1); + } + DBUG_RETURN(0); +} + /* TODO: Clean up loop to only have one call to send_file() */ @@ -346,6 +440,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, LOG_INFO linfo; char *log_file_name = linfo.log_file_name; char search_file_name[FN_REFLEN], *name; + + ulong ev_offset; + IO_CACHE log; File file = -1; String* packet = &thd->packet; @@ -361,6 +458,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + /* + heartbeat_period from @master_heartbeat_period user variable + */ + ulonglong heartbeat_period= get_heartbeat_period(thd); + struct timespec heartbeat_buf; + struct event_coordinates coord_buf; + struct timespec *heartbeat_ts= NULL; + struct event_coordinates *coord= NULL; + if (heartbeat_period != LL(0)) + { + heartbeat_ts= &heartbeat_buf; + set_timespec_nsec(*heartbeat_ts, 0); + coord= &coord_buf; + coord->file_name= log_file_name; // initialization basing on what slave remembers + coord->pos= pos; + } + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", + thd->server_id, log_ident, (ulong)pos); + if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + { + errmsg= "Failed to run hook 'transmit_start'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) @@ -416,11 +537,9 @@ impossible position"; goto err; } - /* - We need to start a packet with something other than 255 - to distinguish it from error - */ - packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */ + /* reset transmit packet for the fake rotate event below */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; /* Tell the client about the log name with a fake Rotate event; @@ -460,7 +579,7 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } - packet->set("\0", 1, &my_charset_bin); + /* Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become this larger than the corresponding packet (query) sent @@ -476,6 +595,11 @@ impossible position"; log_lock = mysql_bin_log.get_log_lock(); if (pos > BIN_LOG_HEADER_SIZE) { + /* reset transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Try to find a Format_description_log_event at the beginning of the binlog @@ -483,29 +607,30 @@ impossible position"; if (!(error = Log_event::read_log_event(&log, packet, log_lock))) { /* - The packet has offsets equal to the normal offsets in a binlog - event +1 (the first character is \0). + The packet has offsets equal to the normal offsets in a + binlog event + ev_offset (the first ev_offset characters are + the header (default \0)). */ DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %d", - (*packet)[EVENT_TYPE_OFFSET+1])); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + (*packet)[EVENT_TYPE_OFFSET+ev_offset])); + if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave should not increment master's binlog position (rli->group_master_log_pos) */ - int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0); + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); /* if reconnect master sends FD event with `created' as 0 to avoid destroying temp tables. */ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ - ST_CREATED_OFFSET+1, (ulong) 0); + ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* send it */ if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { @@ -531,8 +656,6 @@ impossible position"; Format_description_log_event will be found naturally if it is written. */ } - /* reset the packet as we wrote to it in any case */ - packet->set("\0", 1, &my_charset_bin); } /* end of if (pos > BIN_LOG_HEADER_SIZE); */ else { @@ -544,6 +667,12 @@ impossible position"; while (!net->error && net->vio != 0 && !thd->killed) { + Log_event_type event_type= UNKNOWN_EVENT; + + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; while (!(error = Log_event::read_log_event(&log, packet, log_lock))) { #ifndef DBUG_OFF @@ -555,16 +684,31 @@ impossible position"; goto err; } #endif + /* + log's filename does not change while it's active + */ + if (coord) + coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); + if (event_type == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; } - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) + else if (event_type == STOP_EVENT) binlog_can_be_corrupted= FALSE; + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; @@ -572,9 +716,8 @@ impossible position"; goto err; } - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + DBUG_PRINT("info", ("log event code %d", event_type)); + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -583,7 +726,17 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + errmsg= "Failed to run hook 'after_send_event'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + + /* reset transmit packet for next loop */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; } /* @@ -634,6 +787,11 @@ impossible position"; } #endif + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* No one will update the log while we are reading now, but we'll be quick and just read one record @@ -650,34 +808,86 @@ impossible position"; /* we read successfully, so we'll need to send it to the slave */ pthread_mutex_unlock(log_lock); read_packet = 1; + if (coord) + coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); break; case LOG_READ_EOF: + { + int ret; + ulong signal_cnt; DBUG_PRINT("wait",("waiting for data in binary log")); if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0) { pthread_mutex_unlock(log_lock); goto end; } - if (!thd->killed) - { - /* Note that the following call unlocks lock_log */ - mysql_bin_log.wait_for_update(thd, 0); - } - else - pthread_mutex_unlock(log_lock); - DBUG_PRINT("wait",("binary log received update")); - break; - default: +#ifndef DBUG_OFF + ulong hb_info_counter= 0; +#endif + signal_cnt= mysql_bin_log.signal_cnt; + do + { + if (coord) + { + DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0)); + set_timespec_nsec(*heartbeat_ts, heartbeat_period); + } + ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts); + DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL); + if (ret == ETIMEDOUT || ret == ETIME) + { +#ifndef DBUG_OFF + if (hb_info_counter < 3) + { + sql_print_information("master sends heartbeat message"); + hb_info_counter++; + if (hb_info_counter == 3) + sql_print_information("the rest of heartbeat info skipped ..."); + } +#endif + /* reset transmit packet for the heartbeat event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + if (send_heartbeat_event(net, packet, coord)) + { + errmsg = "Failed on my_net_write()"; + my_errno= ER_UNKNOWN_ERROR; + pthread_mutex_unlock(log_lock); + goto err; + } + } + else + { + DBUG_ASSERT(ret == 0 && signal_cnt != mysql_bin_log.signal_cnt || + thd->killed); + DBUG_PRINT("wait",("binary log received update")); + } + } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed); + pthread_mutex_unlock(log_lock); + } + break; + + default: pthread_mutex_unlock(log_lock); fatal_error = 1; break; } if (read_packet) - { - thd_proc_info(thd, "Sending binlog event to slave"); + { + thd_proc_info(thd, "Sending binlog event to slave"); + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; @@ -685,7 +895,7 @@ impossible position"; goto err; } - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -694,11 +904,13 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); - /* - No need to net_flush because we will get to flush later when - we hit EOF pretty quick - */ + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "Failed to run hook 'after_send_event'"; + goto err; + } } if (fatal_error) @@ -734,6 +946,10 @@ impossible position"; end_io_cache(&log); (void) my_close(file, MYF(MY_WME)); + /* reset transmit packet for the possible fake rotate event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Call fake_rotate_event() in case the previous log (the one which we have just finished reading) did not contain a Rotate event @@ -751,8 +967,8 @@ impossible position"; goto err; } - packet->length(0); - packet->append('\0'); + if (coord) + coord->file_name= log_file_name; // reset to the next } } @@ -760,6 +976,7 @@ end: end_io_cache(&log); (void)my_close(file, MYF(MY_WME)); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); my_eof(thd); thd_proc_info(thd, "Waiting to finalize termination"); pthread_mutex_lock(&LOCK_thread_count); @@ -770,6 +987,7 @@ end: err: thd_proc_info(thd, "Waiting to finalize termination"); end_io_cache(&log); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through @@ -1064,6 +1282,7 @@ int reset_slave(THD *thd, Master_info* mi) goto err; } + RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: unlock_slave_threads(mi); if (error) @@ -1137,26 +1356,40 @@ bool change_master(THD* thd, Master_info* mi) int thread_mask; const char* errmsg= 0; bool need_relay_log_purge= 1; + bool ret= FALSE; DBUG_ENTER("change_master"); lock_slave_threads(mi); init_thread_mask(&thread_mask,mi,0 /*not inverse*/); + LEX_MASTER_INFO* lex_mi= &thd->lex->mi; if (thread_mask) // We refuse if any slave thread is running { my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0)); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } thd_proc_info(thd, "Changing master"); - LEX_MASTER_INFO* lex_mi= &thd->lex->mi; + /* + We need to check if there is an empty master_host. Otherwise + change master succeeds, a master.info file is created containing + empty master_host string and when issuing: start slave; an error + is thrown stating that the server is not configured as slave. + (See BUG#28796). + */ + if(lex_mi->host && !*lex_mi->host) + { + my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST"); + unlock_slave_threads(mi); + DBUG_RETURN(TRUE); + } // TODO: see if needs re-write if (init_master_info(mi, master_info_file, relay_log_info_file, 0, thread_mask)) { my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0)); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } /* @@ -1195,13 +1428,46 @@ bool change_master(THD* thd, Master_info* mi) mi->port = lex_mi->port; if (lex_mi->connect_retry) mi->connect_retry = lex_mi->connect_retry; + if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->heartbeat_period = lex_mi->heartbeat_period; + else + mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD, + (slave_net_timeout/2.0)); + mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd + /* + reset the last time server_id list if the current CHANGE MASTER + is mentioning IGNORE_SERVER_IDS= (...) + */ + if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE) + reset_dynamic(&mi->ignore_server_ids); + for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++) + { + ulong s_id; + get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i); + if (s_id == ::server_id && replicate_same_server_id) + { + my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), s_id); + ret= TRUE; + goto err; + } + else + { + if (bsearch((const ulong *) &s_id, + mi->ignore_server_ids.buffer, + mi->ignore_server_ids.elements, sizeof(ulong), + (int (*) (const void*, const void*)) + change_master_server_id_cmp) == NULL) + insert_dynamic(&mi->ignore_server_ids, (uchar*) &s_id); + } + } + sort_dynamic(&mi->ignore_server_ids, (qsort_cmp) change_master_server_id_cmp); - if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED) - mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE); + if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE); - if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED) + if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED) mi->ssl_verify_server_cert= - (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE); + (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE); if (lex_mi->ssl_ca) strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1); @@ -1224,9 +1490,11 @@ bool change_master(THD* thd, Master_info* mi) if (lex_mi->relay_log_name) { need_relay_log_purge= 0; - strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name, + char relay_log_name[FN_REFLEN]; + mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name); + strmake(mi->rli.group_relay_log_name, relay_log_name, sizeof(mi->rli.group_relay_log_name)-1); - strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name, + strmake(mi->rli.event_relay_log_name, relay_log_name, sizeof(mi->rli.event_relay_log_name)-1); } @@ -1273,8 +1541,8 @@ bool change_master(THD* thd, Master_info* mi) if (flush_master_info(mi, 0)) { my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file"); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } if (need_relay_log_purge) { @@ -1285,8 +1553,8 @@ bool change_master(THD* thd, Master_info* mi) &errmsg)) { my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } } else @@ -1301,8 +1569,8 @@ bool change_master(THD* thd, Master_info* mi) &msg, 0)) { my_error(ER_RELAY_LOG_INIT, MYF(0), msg); - unlock_slave_threads(mi); - DBUG_RETURN(TRUE); + ret= TRUE; + goto err; } } /* @@ -1339,10 +1607,13 @@ bool change_master(THD* thd, Master_info* mi) pthread_cond_broadcast(&mi->data_cond); pthread_mutex_unlock(&mi->rli.data_lock); +err: unlock_slave_threads(mi); thd_proc_info(thd, 0); - my_ok(thd); - DBUG_RETURN(FALSE); + if (ret == FALSE) + my_ok(thd); + delete_dynamic(&lex_mi->repl_ignore_server_ids); //freeing of parser-time alloc + DBUG_RETURN(ret); } @@ -1363,7 +1634,11 @@ int reset_master(THD* thd) ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG)); return 1; } - return mysql_bin_log.reset_logs(thd); + + if (mysql_bin_log.reset_logs(thd)) + return 1; + RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); + return 0; } int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, @@ -1401,6 +1676,7 @@ bool mysql_show_binlog_events(THD* thd) bool ret = TRUE; IO_CACHE log; File file = -1; + MYSQL_BIN_LOG *binary_log= NULL; DBUG_ENTER("mysql_show_binlog_events"); Log_event::init_show_field_list(&field_list); @@ -1411,14 +1687,30 @@ bool mysql_show_binlog_events(THD* thd) Format_description_log_event *description_event= new Format_description_log_event(3); /* MySQL 4.0 by default */ - /* - Wait for handlers to insert any pending information - into the binlog. For e.g. ndb which updates the binlog asynchronously - this is needed so that the uses sees all its own commands in the binlog - */ - ha_binlog_wait(thd); + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS || + thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS); - if (mysql_bin_log.is_open()) + /* select wich binary log to use: binlog or relay */ + if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ) + { + /* + Wait for handlers to insert any pending information + into the binlog. For e.g. ndb which updates the binlog asynchronously + this is needed so that the uses sees all its own commands in the binlog + */ + ha_binlog_wait(thd); + + binary_log= &mysql_bin_log; + } + else /* showing relay log contents */ + { + if (!active_mi) + DBUG_RETURN(TRUE); + + binary_log= &(active_mi->rli.relay_log); + } + + if (binary_log->is_open()) { LEX_MASTER_INFO *lex_mi= &thd->lex->mi; SELECT_LEX_UNIT *unit= &thd->lex->unit; @@ -1426,7 +1718,7 @@ bool mysql_show_binlog_events(THD* thd) my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly char search_file_name[FN_REFLEN], *name; const char *log_file_name = lex_mi->log_file_name; - pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock(); + pthread_mutex_t *log_lock = binary_log->get_log_lock(); LOG_INFO linfo; Log_event* ev; @@ -1436,13 +1728,13 @@ bool mysql_show_binlog_events(THD* thd) name= search_file_name; if (log_file_name) - mysql_bin_log.make_log_name(search_file_name, log_file_name); + binary_log->make_log_name(search_file_name, log_file_name); else name=0; // Find first log linfo.index_file_offset = 0; - if (mysql_bin_log.find_log_pos(&linfo, name, 1)) + if (binary_log->find_log_pos(&linfo, name, 1)) { errmsg = "Could not find target log"; goto err; @@ -1745,6 +2037,26 @@ public: bool update(THD *thd, set_var *var); }; +static void fix_slave_net_timeout(THD *thd, enum_var_type type) +{ + DBUG_ENTER("fix_slave_net_timeout"); +#ifdef HAVE_REPLICATION + pthread_mutex_lock(&LOCK_active_mi); + DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f", + slave_net_timeout, + (active_mi? active_mi->heartbeat_period : 0.0))); + if (active_mi && slave_net_timeout < active_mi->heartbeat_period) + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + "The currect value for master_heartbeat_period" + " exceeds the new value of `slave_net_timeout' sec." + " A sensible value for the period should be" + " less than the timeout."); + pthread_mutex_unlock(&LOCK_active_mi); +#endif + DBUG_VOID_RETURN; +} + static sys_var_chain vars = { NULL, NULL }; static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates", @@ -1761,6 +2073,16 @@ static sys_var_const sys_relay_log_info_file(&vars, "relay_log_info_file", (uchar*) &relay_log_info_file); static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge", &relay_log_purge); +static sys_var_bool_ptr sys_relay_log_recovery(&vars, "relay_log_recovery", + &relay_log_recovery); +static sys_var_uint_ptr sys_sync_binlog_period(&vars, "sync_binlog", + &sync_binlog_period); +static sys_var_uint_ptr sys_sync_relaylog_period(&vars, "sync_relay_log", + &sync_relaylog_period); +static sys_var_uint_ptr sys_sync_relayloginfo_period(&vars, "sync_relay_log_info", + &sync_relayloginfo_period); +static sys_var_uint_ptr sys_sync_masterinfo_period(&vars, "sync_master_info", + &sync_masterinfo_period); static sys_var_const sys_relay_log_space_limit(&vars, "relay_log_space_limit", OPT_GLOBAL, SHOW_LONGLONG, @@ -1770,13 +2092,13 @@ static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir", OPT_GLOBAL, SHOW_CHAR_PTR, (uchar*) &slave_load_tmpdir); static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout", - &slave_net_timeout); + &slave_net_timeout, + fix_slave_net_timeout); static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors", OPT_GLOBAL, SHOW_CHAR, (uchar*) slave_skip_error_names); static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries", &slave_trans_retries); -static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period); static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter"); @@ -1818,12 +2140,6 @@ bool sys_var_slave_skip_counter::update(THD *thd, set_var *var) } -bool sys_var_sync_binlog_period::update(THD *thd, set_var *var) -{ - sync_binlog_period= (ulong) var->save_result.ulonglong_value; - return 0; -} - int init_replication_sys_vars() { if (mysql_add_sys_var_chain(vars.first, my_long_options)) @@ -1835,6 +2151,5 @@ int init_replication_sys_vars() return 0; } -#endif /* HAVE_REPLICATION */ - +#endif /* HAVE_REPLICATION */ diff --git a/sql/sql_repl.h b/sql/sql_repl.h index d5c9040f8dc..aa71ac96ff8 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -13,6 +13,9 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifndef SQL_REPL_INCLUDED +#define SQL_REPL_INCLUDED + #include "rpl_filter.h" #ifdef HAVE_REPLICATION @@ -65,3 +68,4 @@ int init_replication_sys_vars(); #endif /* HAVE_REPLICATION */ +#endif /* SQL_REPL_INCLUDED */ diff --git a/sql/sql_select.h b/sql/sql_select.h index a0366d47149..12cfebfc374 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -1,3 +1,6 @@ +#ifndef SQL_SELECT_INCLUDED +#define SQL_SELECT_INCLUDED + /* Copyright (C) 2000-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -736,3 +739,4 @@ inline bool optimizer_flag(THD *thd, uint flag) return (thd->variables.optimizer_switch & flag); } +#endif /* SQL_SELECT_INCLUDED */ diff --git a/sql/sql_servers.h b/sql/sql_servers.h index 63c691893d1..12855f8473c 100644 --- a/sql/sql_servers.h +++ b/sql/sql_servers.h @@ -1,3 +1,6 @@ +#ifndef SQL_SERVERS_INCLUDED +#define SQL_SERVERS_INCLUDED + /* Copyright (C) 2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -41,3 +44,5 @@ int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options); /* lookup functions */ FOREIGN_SERVER *get_server_by_name(MEM_ROOT *mem, const char *server_name, FOREIGN_SERVER *server_buffer); + +#endif /* SQL_SERVERS_INCLUDED */ diff --git a/sql/sql_sort.h b/sql/sql_sort.h index 1e9322f7f5b..102b3ef0a11 100644 --- a/sql/sql_sort.h +++ b/sql/sql_sort.h @@ -1,3 +1,6 @@ +#ifndef SQL_SORT_INCLUDED +#define SQL_SORT_INCLUDED + /* Copyright (C) 2000 MySQL AB This program is free software; you can redistribute it and/or modify @@ -87,3 +90,5 @@ int merge_buffers(SORTPARAM *param,IO_CACHE *from_file, BUFFPEK *lastbuff,BUFFPEK *Fb, BUFFPEK *Tb,int flag); void reuse_freed_buff(QUEUE *queue, BUFFPEK *reuse, uint key_length); + +#endif /* SQL_SORT_INCLUDED */ diff --git a/sql/sql_string.h b/sql/sql_string.h index d62908e5d66..7b10aafbff1 100644 --- a/sql/sql_string.h +++ b/sql/sql_string.h @@ -1,3 +1,6 @@ +#ifndef SQL_STRING_INCLUDED +#define SQL_STRING_INCLUDED + /* Copyright (C) 2000 MySQL AB This program is free software; you can redistribute it and/or modify @@ -389,3 +392,5 @@ static inline bool check_if_only_end_space(CHARSET_INFO *cs, char *str, { return str+ cs->cset->scan(cs, str, end, MY_SEQ_SPACES) == end; } + +#endif /* SQL_STRING_INCLUDED */ diff --git a/sql/sql_trigger.h b/sql/sql_trigger.h index f6754a75284..b411acf2ac5 100644 --- a/sql/sql_trigger.h +++ b/sql/sql_trigger.h @@ -1,3 +1,6 @@ +#ifndef SQL_TRIGGER_INCLUDED +#define SQL_TRIGGER_INCLUDED + /* Copyright (C) 2004-2005 MySQL AB This program is free software; you can redistribute it and/or modify @@ -174,3 +177,4 @@ bool load_table_name_for_trigger(THD *thd, const LEX_STRING *trn_path, LEX_STRING *tbl_name); +#endif /* SQL_TRIGGER_INCLUDED */ diff --git a/sql/sql_udf.h b/sql/sql_udf.h index 4b8b492698e..95cb167869e 100644 --- a/sql/sql_udf.h +++ b/sql/sql_udf.h @@ -1,3 +1,6 @@ +#ifndef SQL_UDF_INCLUDED +#define SQL_UDF_INCLUDED + /* Copyright (C) 2000-2001, 2003-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -140,3 +143,4 @@ void free_udf(udf_func *udf); int mysql_create_function(THD *thd,udf_func *udf); int mysql_drop_function(THD *thd,const LEX_STRING *name); #endif +#endif /* SQL_UDF_INCLUDED */ diff --git a/sql/sql_view.h b/sql/sql_view.h index e08c2168e14..3de51c3264e 100644 --- a/sql/sql_view.h +++ b/sql/sql_view.h @@ -1,3 +1,6 @@ +#ifndef SQL_VIEW_INCLUDED +#define SQL_VIEW_INCLUDED + /* -*- C++ -*- */ /* Copyright (C) 2004 MySQL AB @@ -42,3 +45,4 @@ bool mysql_rename_view(THD *thd, const char *new_db, const char *new_name, #define VIEW_ANY_ACL (SELECT_ACL | UPDATE_ACL | INSERT_ACL | DELETE_ACL) +#endif /* SQL_VIEW_INCLUDED */ diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 1a19be81a44..3be3f804678 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -740,6 +740,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token IDENT_QUOTED %token IF %token IGNORE_SYM +%token IGNORE_SERVER_IDS_SYM %token IMPORT %token INDEXES %token INDEX_SYM @@ -814,6 +815,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token MASTER_SSL_VERIFY_SERVER_CERT_SYM %token MASTER_SYM %token MASTER_USER_SYM +%token MASTER_HEARTBEAT_PERIOD_SYM %token MATCH /* SQL-2003-R */ %token MAX_CONNECTIONS_PER_HOUR %token MAX_QUERIES_PER_HOUR @@ -930,6 +932,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token REDUNDANT_SYM %token REFERENCES /* SQL-2003-R */ %token REGEXP +%token RELAYLOG_SYM %token RELAY_LOG_FILE_SYM %token RELAY_LOG_POS_SYM %token RELAY_THREAD @@ -1558,6 +1561,12 @@ change: LEX *lex = Lex; lex->sql_command = SQLCOM_CHANGE_MASTER; bzero((char*) &lex->mi, sizeof(lex->mi)); + /* + resetting flags that can left from the previous CHANGE MASTER + */ + lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_UNCHANGED; + my_init_dynamic_array(&Lex->mi.repl_ignore_server_ids, + sizeof(::server_id), 16, 16); } master_defs {} @@ -1592,7 +1601,7 @@ master_def: | MASTER_SSL_SYM EQ ulong_num { Lex->mi.ssl= $3 ? - LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE; + LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE; } | MASTER_SSL_CA_SYM EQ TEXT_STRING_sys { @@ -1617,11 +1626,69 @@ master_def: | MASTER_SSL_VERIFY_SERVER_CERT_SYM EQ ulong_num { Lex->mi.ssl_verify_server_cert= $3 ? - LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE; + LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE; } - | master_file_def + + | MASTER_HEARTBEAT_PERIOD_SYM EQ NUM_literal + { + Lex->mi.heartbeat_period= (float) $3->val_real(); + if (Lex->mi.heartbeat_period > SLAVE_MAX_HEARTBEAT_PERIOD || + Lex->mi.heartbeat_period < 0.0) + { + const char format[]= "%d seconds"; + char buf[4*sizeof(SLAVE_MAX_HEARTBEAT_PERIOD) + sizeof(format)]; + my_sprintf(buf, (buf, format, SLAVE_MAX_HEARTBEAT_PERIOD)); + my_error(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + MYF(0), + " is negative or exceeds the maximum ", + buf); + MYSQL_YYABORT; + } + if (Lex->mi.heartbeat_period > slave_net_timeout) + { + push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE), + " exceeds the value of `slave_net_timeout' sec.", + " A sensible value for the period should be" + " less than the timeout."); + } + if (Lex->mi.heartbeat_period < 0.001) + { + if (Lex->mi.heartbeat_period != 0.0) + { + push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE), + " is less than 1 msec.", + " The period is reset to zero which means" + " no heartbeats will be sending"); + Lex->mi.heartbeat_period= 0.0; + } + Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_DISABLE; + } + Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_ENABLE; + } + | IGNORE_SERVER_IDS_SYM EQ '(' ignore_server_id_list ')' + { + Lex->mi.repl_ignore_server_ids_opt= LEX_MASTER_INFO::LEX_MI_ENABLE; + } + | + master_file_def ; +ignore_server_id_list: + /* Empty */ + | ignore_server_id + | ignore_server_id_list ',' ignore_server_id + ; + +ignore_server_id: + ulong_num + { + insert_dynamic(&Lex->mi.repl_ignore_server_ids, (uchar*) &($1)); + } + master_file_def: MASTER_LOG_FILE_SYM EQ TEXT_STRING_sys { @@ -9959,6 +10026,11 @@ show_param: LEX *lex= Lex; lex->sql_command= SQLCOM_SHOW_BINLOG_EVENTS; } opt_limit_clause_init + | RELAYLOG_SYM EVENTS_SYM binlog_in binlog_from + { + LEX *lex= Lex; + lex->sql_command= SQLCOM_SHOW_RELAYLOG_EVENTS; + } opt_limit_clause_init | keys_or_index from_or_in table_ident opt_db where_clause { LEX *lex= Lex; @@ -11600,6 +11672,7 @@ keyword_sp: | REDO_BUFFER_SIZE_SYM {} | REDOFILE_SYM {} | REDUNDANT_SYM {} + | RELAYLOG_SYM {} | RELAY_LOG_FILE_SYM {} | RELAY_LOG_POS_SYM {} | RELAY_THREAD {} diff --git a/sql/structs.h b/sql/structs.h index a58c18f97c5..bcda7b1e787 100644 --- a/sql/structs.h +++ b/sql/structs.h @@ -1,3 +1,6 @@ +#ifndef STRUCTS_INCLUDED +#define STRUCTS_INCLUDED + /* Copyright (C) 2000-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -380,3 +383,5 @@ public: Discrete_interval* get_tail() const { return tail; }; Discrete_interval* get_current() const { return current; }; }; + +#endif /* STRUCTS_INCLUDED */ diff --git a/sql/table.h b/sql/table.h index 40372fa91cf..7c43ab339e5 100644 --- a/sql/table.h +++ b/sql/table.h @@ -1,3 +1,6 @@ +#ifndef TABLE_INCLUDED +#define TABLE_INCLUDED + /* Copyright 2000-2008 MySQL AB, 2008 Sun Microsystems, Inc. This program is free software; you can redistribute it and/or modify @@ -1729,3 +1732,4 @@ static inline void dbug_tmp_restore_column_maps(MY_BITMAP *read_set, size_t max_row_length(TABLE *table, const uchar *data); +#endif /* TABLE_INCLUDED */ diff --git a/sql/tzfile.h b/sql/tzfile.h index 1ff82d62329..1c1800ba1ed 100644 --- a/sql/tzfile.h +++ b/sql/tzfile.h @@ -1,3 +1,6 @@ +#ifndef TZFILE_INCLUDED +#define TZFILE_INCLUDED + /* Copyright (C) 2004 MySQL AB This program is free software; you can redistribute it and/or modify @@ -134,3 +137,5 @@ struct tzhead { */ #define isleap(y) (((y) % 4) == 0 && (((y) % 100) != 0 || ((y) % 400) == 0)) + +#endif diff --git a/sql/tztime.h b/sql/tztime.h index 9bf103519c4..9990e91f17b 100644 --- a/sql/tztime.h +++ b/sql/tztime.h @@ -1,3 +1,6 @@ +#ifndef TZTIME_INCLUDED +#define TZTIME_INCLUDED + /* Copyright (C) 2004 MySQL AB This program is free software; you can redistribute it and/or modify @@ -79,3 +82,4 @@ static const int MY_TZ_TABLES_COUNT= 4; #endif /* !defined(TESTTIME) && !defined(TZINFO2SQL) */ +#endif /* TZTIME_INCLUDED */ diff --git a/sql/unireg.h b/sql/unireg.h index 3ff7f058e3c..6c9080aea79 100644 --- a/sql/unireg.h +++ b/sql/unireg.h @@ -1,3 +1,6 @@ +#ifndef UNIREG_INCLUDED +#define UNIREG_INCLUDED + /* Copyright (C) 2000-2006 MySQL AB This program is free software; you can redistribute it and/or modify @@ -16,8 +19,6 @@ /* Extra functions used by unireg library */ -#ifndef _unireg_h - #ifndef NO_ALARM_LOOP #define NO_ALARM_LOOP /* lib5 and popen can't use alarm */ #endif |