summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.h
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-11-01 09:17:06 +0100
committerunknown <knielsen@knielsen-hq.org>2013-11-01 09:17:06 +0100
commitcb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (patch)
treedaff81c02baa6c2581d6abe3d746b8f35ee44f32 /sql/rpl_rli.h
parentf4d5d849fd3b526d38ca6eb083fd0b290eb0eda7 (diff)
parent39df665a3332bd9bfb2529419f534a49cfac388c (diff)
downloadmariadb-git-cb86ce60b9bade5ae7712d8f3f74668208ee3fd2.tar.gz
Merge MDEV-4506: Parallel replication into 10.0-base.
Diffstat (limited to 'sql/rpl_rli.h')
-rw-r--r--sql/rpl_rli.h386
1 files changed, 244 insertions, 142 deletions
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 9ab5dcb30a5..a3dcf7ad7e9 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -22,6 +22,7 @@
#include "log.h" /* LOG_INFO, MYSQL_BIN_LOG */
#include "sql_class.h" /* THD */
#include "log_event.h"
+#include "rpl_parallel.h"
struct RPL_TABLE_LIST;
class Master_info;
@@ -52,18 +53,20 @@ class Master_info;
*****************************************************************************/
+struct rpl_group_info;
+
class Relay_log_info : public Slave_reporting_capability
{
public:
/**
- Flags for the state of the replication.
- */
+ Flags for the state of reading the relay log. Note that these are
+ bit masks.
+ */
enum enum_state_flag {
- /** The replication thread is inside a statement */
- IN_STMT,
-
- /** Flag counter. Should always be last */
- STATE_FLAGS_COUNT
+ /** We are inside a group of events forming a statement */
+ IN_STMT=1,
+ /** We have inside a transaction */
+ IN_TRANSACTION=2
};
/*
@@ -128,9 +131,14 @@ public:
IO_CACHE info_file;
/*
- When we restart slave thread we need to have access to the previously
- created temporary tables. Modified only on init/end and by the SQL
- thread, read only by SQL thread.
+ List of temporary tables used by this connection.
+ This is updated when a temporary table is created or dropped by
+ a replication thread.
+
+ Not reset when replication ends, to allow one to access the tables
+ when replication restarts.
+
+ Protected by data_lock.
*/
TABLE *save_temporary_tables;
@@ -138,13 +146,13 @@ public:
standard lock acquisition order to avoid deadlocks:
run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
*/
- mysql_mutex_t data_lock, run_lock, sleep_lock;
+ mysql_mutex_t data_lock, run_lock;
/*
start_cond is broadcast when SQL thread is started
stop_cond - when stopped
data_cond - when data protected by data_lock changes
*/
- mysql_cond_t start_cond, stop_cond, data_cond, sleep_cond;
+ mysql_cond_t start_cond, stop_cond, data_cond;
/* parent Master_info structure */
Master_info *mi;
@@ -161,8 +169,8 @@ public:
- an autocommiting query + its associated events (INSERT_ID,
TIMESTAMP...)
We need these rli coordinates :
- - relay log name and position of the beginning of the group we currently are
- executing. Needed to know where we have to restart when replication has
+ - relay log name and position of the beginning of the group we currently
+ are executing. Needed to know where we have to restart when replication has
stopped in the middle of a group (which has been rolled back by the slave).
- relay log name and position just after the event we have just
executed. This event is part of the current group.
@@ -177,6 +185,10 @@ public:
char event_relay_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
ulonglong future_event_relay_log_pos;
+ /*
+ The master log name for current event. Only used in parallel replication.
+ */
+ char future_event_master_log_name[FN_REFLEN];
#ifdef HAVE_valgrind
bool is_fake; /* Mark that this is a fake relay log info structure */
@@ -208,18 +220,6 @@ public:
*/
bool sql_force_rotate_relay;
- /*
- When it commits, InnoDB internally stores the master log position it has
- processed so far; the position to store is the one of the end of the
- committing event (the COMMIT query event, or the event if in autocommit
- mode).
- */
-#if MYSQL_VERSION_ID < 40100
- ulonglong future_master_log_pos;
-#else
- ulonglong future_group_master_log_pos;
-#endif
-
time_t last_master_timestamp;
void clear_until_condition();
@@ -236,7 +236,13 @@ public:
ulong max_relay_log_size;
mysql_mutex_t log_space_lock;
mysql_cond_t log_space_cond;
- THD * sql_thd;
+ /*
+ THD for the main sql thread, the one that starts threads to process
+ slave requests. If there is only one thread, then this THD is also
+ used for SQL processing.
+ A kill sent to this THD will kill the replication.
+ */
+ THD *sql_driver_thd;
#ifndef DBUG_OFF
int events_till_abort;
#endif
@@ -284,14 +290,16 @@ public:
char cached_charset[6];
/*
- trans_retries varies between 0 to slave_transaction_retries and counts how
- many times the slave has retried the present transaction; gets reset to 0
- when the transaction finally succeeds. retried_trans is a cumulative
- counter: how many times the slave has retried a transaction (any) since
- slave started.
+ retried_trans is a cumulative counter: how many times the slave
+ has retried a transaction (any) since slave started.
+ Protected by data_lock.
+ */
+ ulong retried_trans;
+ /*
+ Number of executed events for SLAVE STATUS.
+ Protected by slave_executed_entries_lock
*/
- ulong trans_retries, retried_trans;
- ulong executed_entries; /* For SLAVE STATUS */
+ int64 executed_entries;
/*
If the end of the hot relay log is made of master's events ignored by the
@@ -313,13 +321,7 @@ public:
char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size;
- /*
- Current GTID being processed.
- The sub_id gives the binlog order within one domain_id. A zero sub_id
- means that there is no active GTID.
- */
- uint64 gtid_sub_id;
- rpl_gtid current_gtid;
+ rpl_parallel parallel;
Relay_log_info(bool is_slave_recovery);
~Relay_log_info();
@@ -343,13 +345,9 @@ public:
if (until_condition==UNTIL_MASTER_POS)
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
}
-
- inline void inc_event_relay_log_pos()
- {
- event_relay_log_pos= future_event_relay_log_pos;
- }
void inc_group_relay_log_pos(ulonglong log_pos,
+ rpl_group_info *rgi,
bool skip_lock=0);
int wait_for_pos(THD* thd, String* log_name, longlong log_pos,
@@ -366,27 +364,6 @@ public:
group_relay_log_pos);
}
- RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
- uint tables_to_lock_count; /* RBR: Count of tables to lock */
- table_mapping m_table_map; /* RBR: Mapping table-id to table */
-
- bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
- {
- DBUG_ASSERT(tabledef_var && conv_table_var);
- for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
- if (ptr->table == table_arg)
- {
- *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
- *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table;
- DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
- " tabledef: %p, conv_table: %p",
- table_arg->s->db.str, table_arg->s->table_name.str,
- *tabledef_var, *conv_table_var));
- return true;
- }
- return false;
- }
-
/*
Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
the thread save 3 get_charset() per Query_log_event if the charset is not
@@ -396,52 +373,6 @@ public:
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
- void cleanup_context(THD *, bool);
- void slave_close_thread_tables(THD *);
- void clear_tables_to_lock();
-
- /*
- Used to defer stopping the SQL thread to give it a chance
- to finish up the current group of events.
- The timestamp is set and reset in @c sql_slave_killed().
- */
- time_t last_event_start_time;
-
- /*
- A container to hold on Intvar-, Rand-, Uservar- log-events in case
- the slave is configured with table filtering rules.
- The withhold events are executed when their parent Query destiny is
- determined for execution as well.
- */
- Deferred_log_events *deferred_events;
-
- /*
- State of the container: true stands for IRU events gathering,
- false does for execution, either deferred or direct.
- */
- bool deferred_events_collecting;
-
- /*
- Returns true if the argument event resides in the containter;
- more specifically, the checking is done against the last added event.
- */
- bool is_deferred_event(Log_event * ev)
- {
- return deferred_events_collecting ? deferred_events->is_last(ev) : false;
- };
- /* The general cleanup that slave applier may need at the end of query. */
- inline void cleanup_after_query()
- {
- if (deferred_events)
- deferred_events->rewind();
- };
- /* The general cleanup that slave applier may need at the end of session. */
- void cleanup_after_session()
- {
- if (deferred_events)
- delete deferred_events;
- };
-
/**
Helper function to do after statement completion.
@@ -461,8 +392,28 @@ public:
the <code>Seconds_behind_master</code> field.
*/
void stmt_done(my_off_t event_log_pos,
- time_t event_creation_time, THD *thd);
+ time_t event_creation_time, THD *thd,
+ rpl_group_info *rgi);
+
+ /**
+ Is the replication inside a group?
+ The reader of the relay log is inside a group if either:
+ - The IN_TRANSACTION flag is set, meaning we're inside a transaction
+ - The IN_STMT flag is set, meaning we have read at least one row from
+ a multi-event entry.
+
+ This flag reflects the state of the log 'just now', ie after the last
+ read event would be executed.
+ This allow us to test if we can stop replication before reading
+ the next entry.
+
+ @retval true Replication thread is currently inside a group
+ @retval false Replication thread is currently not inside a group
+ */
+ bool is_in_group() const {
+ return (m_flags & (IN_STMT | IN_TRANSACTION));
+ }
/**
Set the value of a replication state flag.
@@ -471,7 +422,7 @@ public:
*/
void set_flag(enum_state_flag flag)
{
- m_flags |= (1UL << flag);
+ m_flags|= flag;
}
/**
@@ -483,7 +434,7 @@ public:
*/
bool get_flag(enum_state_flag flag)
{
- return m_flags & (1UL << flag);
+ return m_flags & flag;
}
/**
@@ -493,23 +444,156 @@ public:
*/
void clear_flag(enum_state_flag flag)
{
- m_flags &= ~(1UL << flag);
+ m_flags&= ~flag;
}
- /**
- Is the replication inside a group?
+private:
- Replication is inside a group if either:
- - The OPTION_BEGIN flag is set, meaning we're inside a transaction
- - The RLI_IN_STMT flag is set, meaning we're inside a statement
+ /*
+ Holds the state of the data in the relay log.
+ We need this to ensure that we are not in the middle of a
+ statement or inside BEGIN ... COMMIT when should rotate the
+ relay log.
+ */
+ uint32 m_flags;
+};
- @retval true Replication thread is currently inside a group
- @retval false Replication thread is currently not inside a group
+
+/*
+ This is data for various state needed to be kept for the processing of
+ one event group (transaction) during replication.
+
+ In single-threaded replication, there will be one global rpl_group_info and
+ one global Relay_log_info per master connection. They will be linked
+ together.
+
+ In parallel replication, there will be one rpl_group_info object for
+ each running sql thread, each having their own thd.
+
+ All rpl_group_info will share the same Relay_log_info.
+*/
+
+struct rpl_group_info
+{
+ Relay_log_info *rli;
+ THD *thd;
+ /*
+ Current GTID being processed.
+ The sub_id gives the binlog order within one domain_id. A zero sub_id
+ means that there is no active GTID.
+ */
+ uint64 gtid_sub_id;
+ rpl_gtid current_gtid;
+ /*
+ This is used to keep transaction commit order.
+ We will signal this when we commit, and can register it to wait for the
+ commit_orderer of the previous commit to signal us.
+ */
+ wait_for_commit commit_orderer;
+ /*
+ If non-zero, the sub_id of a prior event group whose commit we have to wait
+ for before committing ourselves. Then wait_commit_group_info points to the
+ event group to wait for.
+
+ Before using this, rpl_parallel_entry::last_committed_sub_id should be
+ compared against wait_commit_sub_id. Only if last_committed_sub_id is
+ smaller than wait_commit_sub_id must the wait be done (otherwise the
+ waited-for transaction is already committed, so we would otherwise wait
+ for the wrong commit).
+ */
+ uint64 wait_commit_sub_id;
+ rpl_group_info *wait_commit_group_info;
+ /*
+ If non-zero, the event group must wait for this sub_id to be committed
+ before the execution of the event group is allowed to start.
+
+ (When we execute in parallel the transactions that group committed
+ together on the master, we still need to wait for any prior transactions
+ to have commtted).
+ */
+ uint64 wait_start_sub_id;
+
+ struct rpl_parallel_entry *parallel_entry;
+
+ /*
+ A container to hold on Intvar-, Rand-, Uservar- log-events in case
+ the slave is configured with table filtering rules.
+ The withhold events are executed when their parent Query destiny is
+ determined for execution as well.
+ */
+ Deferred_log_events *deferred_events;
+
+ /*
+ State of the container: true stands for IRU events gathering,
+ false does for execution, either deferred or direct.
+ */
+ bool deferred_events_collecting;
+
+ Annotate_rows_log_event *m_annotate_event;
+
+ RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
+ uint tables_to_lock_count; /* RBR: Count of tables to lock */
+ table_mapping m_table_map; /* RBR: Mapping table-id to table */
+ mysql_mutex_t sleep_lock;
+ mysql_cond_t sleep_cond;
+
+ /*
+ trans_retries varies between 0 to slave_transaction_retries and counts how
+ many times the slave has retried the present transaction; gets reset to 0
+ when the transaction finally succeeds.
+ */
+ ulong trans_retries;
+
+ /*
+ Used to defer stopping the SQL thread to give it a chance
+ to finish up the current group of events.
+ The timestamp is set and reset in @c sql_slave_killed().
+ */
+ time_t last_event_start_time;
+
+ char *event_relay_log_name;
+ char event_relay_log_name_buf[FN_REFLEN];
+ ulonglong event_relay_log_pos;
+ ulonglong future_event_relay_log_pos;
+ /*
+ The master log name for current event. Only used in parallel replication.
+ */
+ char future_event_master_log_name[FN_REFLEN];
+ bool is_parallel_exec;
+ bool is_error;
+
+private:
+ /*
+ Runtime state for printing a note when slave is taking
+ too long while processing a row event.
*/
- bool is_in_group() const {
- return (sql_thd->variables.option_bits & OPTION_BEGIN) ||
- (m_flags & (1UL << IN_STMT));
- }
+ time_t row_stmt_start_timestamp;
+ bool long_find_row_note_printed;
+public:
+
+ rpl_group_info(Relay_log_info *rli_);
+ ~rpl_group_info();
+
+ /*
+ Returns true if the argument event resides in the containter;
+ more specifically, the checking is done against the last added event.
+ */
+ bool is_deferred_event(Log_event * ev)
+ {
+ return deferred_events_collecting ? deferred_events->is_last(ev) : false;
+ };
+ /* The general cleanup that slave applier may need at the end of query. */
+ inline void cleanup_after_query()
+ {
+ if (deferred_events)
+ deferred_events->rewind();
+ };
+ /* The general cleanup that slave applier may need at the end of session. */
+ void cleanup_after_session()
+ {
+ if (deferred_events)
+ delete deferred_events;
+ };
/**
Save pointer to Annotate_rows event and switch on the
@@ -520,7 +604,7 @@ public:
{
free_annotate_event();
m_annotate_event= event;
- sql_thd->variables.binlog_annotate_row_events= 1;
+ this->thd->variables.binlog_annotate_row_events= 1;
}
/**
@@ -542,12 +626,33 @@ public:
{
if (m_annotate_event)
{
- sql_thd->variables.binlog_annotate_row_events= 0;
+ this->thd->variables.binlog_annotate_row_events= 0;
delete m_annotate_event;
m_annotate_event= 0;
}
}
+ bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
+ {
+ DBUG_ASSERT(tabledef_var && conv_table_var);
+ for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
+ if (ptr->table == table_arg)
+ {
+ *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
+ *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table;
+ DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
+ " tabledef: %p, conv_table: %p",
+ table_arg->s->db.str, table_arg->s->table_name.str,
+ *tabledef_var, *conv_table_var));
+ return true;
+ }
+ return false;
+ }
+
+ void clear_tables_to_lock();
+ void cleanup_context(THD *, bool);
+ void slave_close_thread_tables(THD *);
+
time_t get_row_stmt_start_timestamp()
{
return row_stmt_start_timestamp;
@@ -581,18 +686,12 @@ public:
return long_find_row_note_printed;
}
-private:
-
- uint32 m_flags;
-
- /*
- Runtime state for printing a note when slave is taking
- too long while processing a row event.
- */
- time_t row_stmt_start_timestamp;
- bool long_find_row_note_printed;
-
- Annotate_rows_log_event *m_annotate_event;
+ inline void inc_event_relay_log_pos()
+ {
+ if (!is_parallel_exec ||
+ rli->event_relay_log_pos < future_event_relay_log_pos)
+ rli->event_relay_log_pos= future_event_relay_log_pos;
+ }
};
@@ -603,5 +702,8 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
int rpl_load_gtid_slave_state(THD *thd);
+int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
+void delete_or_keep_event_post_apply(rpl_group_info *rgi,
+ Log_event_type typ, Log_event *ev);
#endif /* RPL_RLI_H */