summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.h
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.h')
-rw-r--r--sql/rpl_parallel.h146
1 files changed, 126 insertions, 20 deletions
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 019a354c57d..90649230f98 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -9,16 +9,66 @@ struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
class Relay_log_info;
+
+
+/*
+ Structure used to keep track of the parallel replication of a batch of
+ event-groups that group-committed together on the master.
+
+ It is used to ensure that every event group in one batch has reached the
+ commit stage before the next batch starts executing.
+
+ Note the lifetime of this structure:
+
+ - It is allocated when the first event in a new batch of group commits
+ is queued, from the free list rpl_parallel_entry::gco_free_list.
+
+ - The gco for the batch currently being queued is owned by
+ rpl_parallel_entry::current_gco. The gco for a previous batch that has
+ been fully queued is owned by the gco->prev_gco pointer of the gco for
+ the following batch.
+
+ - The worker thread waits on gco->COND_group_commit_orderer for
+ rpl_parallel_entry::count_committing_event_groups to reach wait_count
+ before starting; the first waiter links the gco into the next_gco
+ pointer of the gco of the previous batch for signalling.
+
+ - When an event group reaches the commit stage, it signals the
+ COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and
+ rpl_parallel_entry::count_committing_event_groups has reached
+ gco->next_gco->wait_count.
+
+ - When gco->wait_count is reached for a worker and the wait completes,
+ the worker frees gco->prev_gco; at this point it is guaranteed not to
+ be needed any longer.
+*/
+struct group_commit_orderer {
+ /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
+ mysql_cond_t COND_group_commit_orderer;
+ uint64 wait_count;
+ group_commit_orderer *prev_gco;
+ group_commit_orderer *next_gco;
+ bool installed;
+};
+
+
struct rpl_parallel_thread {
bool delay_start;
bool running;
bool stop;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
+ mysql_cond_t COND_rpl_thread_queue;
struct rpl_parallel_thread *next; /* For free list. */
struct rpl_parallel_thread_pool *pool;
THD *thd;
- struct rpl_parallel_entry *current_entry;
+ /*
+ Who owns the thread, if any (it's a pointer into the
+ rpl_parallel_entry::rpl_threads array.
+ */
+ struct rpl_parallel_thread **current_owner;
+ /* The rpl_parallel_entry of the owner. */
+ rpl_parallel_entry *current_entry;
struct queued_event {
queued_event *next;
Log_event *ev;
@@ -31,6 +81,9 @@ struct rpl_parallel_thread {
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
+ queued_event *qev_free_list;
+ rpl_group_info *rgi_free_list;
+ group_commit_orderer *gco_free_list;
void enqueue(queued_event *qev)
{
@@ -42,15 +95,25 @@ struct rpl_parallel_thread {
queued_size+= qev->event_size;
}
- void dequeue(queued_event *list)
+ void dequeue1(queued_event *list)
{
- queued_event *tmp;
-
DBUG_ASSERT(list == event_queue);
event_queue= last_in_queue= NULL;
- for (tmp= list; tmp; tmp= tmp->next)
- queued_size-= tmp->event_size;
}
+
+ void dequeue2(size_t dequeue_size)
+ {
+ queued_size-= dequeue_size;
+ }
+
+ queued_event *get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli);
+ void free_qev(queued_event *qev);
+ rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e);
+ void free_rgi(rpl_group_info *rgi);
+ group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
+ void free_gco(group_commit_orderer *gco);
};
@@ -66,14 +129,16 @@ struct rpl_parallel_thread_pool {
rpl_parallel_thread_pool();
int init(uint32 size);
void destroy();
- struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
+ struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry);
+ void release_thread(rpl_parallel_thread *rpt);
};
struct rpl_parallel_entry {
+ mysql_mutex_t LOCK_parallel_entry;
+ mysql_cond_t COND_parallel_entry;
uint32 domain_id;
- uint32 last_server_id;
- uint64 last_seq_no;
uint64 last_commit_id;
bool active;
/*
@@ -82,15 +147,41 @@ struct rpl_parallel_entry {
waiting for event groups to complete.
*/
bool force_abort;
+ /*
+ At STOP SLAVE (force_abort=true), we do not want to process all events in
+ the queue (which could unnecessarily delay stop, if a lot of events happen
+ to be queued). The stop_count provides a safe point at which to stop, so
+ that everything before becomes committed and nothing after does. The value
+ corresponds to group_commit_orderer::wait_count; if wait_count is less than
+ or equal to stop_count, we execute the associated event group, else we
+ skip it (and all following) and stop.
+ */
+ uint64 stop_count;
- rpl_parallel_thread *rpl_thread;
+ /*
+ Cyclic array recording the last rpl_thread_max worker threads that we
+ queued event for. This is used to limit how many workers a single domain
+ can occupy (--slave-domain-parallel-threads).
+
+ Note that workers are never explicitly deleted from the array. Instead,
+ we need to check (under LOCK_rpl_thread) that the thread still belongs
+ to us before re-using (rpl_thread::current_owner).
+ */
+ rpl_parallel_thread **rpl_threads;
+ uint32 rpl_thread_max;
+ uint32 rpl_thread_idx;
/*
The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection.
+
+ Event groups commit in order, so the rpl_group_info for an event group
+ will be alive (at least) as long as
+ rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to
+ safely refer back to previous event groups if they are still executing,
+ and ignore them if they completed, without requiring explicit
+ synchronisation between the threads.
*/
uint64 last_committed_sub_id;
- mysql_mutex_t LOCK_parallel_entry;
- mysql_cond_t COND_parallel_entry;
/*
The sub_id of the last event group in this replication domain that was
queued for execution by a worker thread.
@@ -98,14 +189,29 @@ struct rpl_parallel_entry {
uint64 current_sub_id;
rpl_group_info *current_group_info;
/*
- The sub_id of the last event group in the previous batch of group-committed
- transactions.
-
- When we spawn parallel worker threads for the next group-committed batch,
- they first need to wait for this sub_id to be committed before it is safe
- to start executing them.
+ If we get an error in some event group, we set the sub_id of that event
+ group here. Then later event groups (with higher sub_id) can know not to
+ try to start (event groups that already started will be rolled back when
+ wait_for_prior_commit() returns error).
+ The value is ULONGLONG_MAX when no error occured.
+ */
+ uint64 stop_on_error_sub_id;
+ /* Total count of event groups queued so far. */
+ uint64 count_queued_event_groups;
+ /*
+ Count of event groups that have started (but not necessarily completed)
+ the commit phase. We use this to know when every event group in a previous
+ batch of master group commits have started committing on the slave, so
+ that it is safe to start executing the events in the following batch.
*/
- uint64 prev_groupcommit_sub_id;
+ uint64 count_committing_event_groups;
+ /* The group_commit_orderer object for the events currently being queued. */
+ group_commit_orderer *current_gco;
+
+ rpl_parallel_thread * choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ const char **old_msg, bool reuse);
+ group_commit_orderer *get_gco();
+ void free_gco(group_commit_orderer *gco);
};
struct rpl_parallel {
HASH domain_hash;
@@ -116,7 +222,7 @@ struct rpl_parallel {
~rpl_parallel();
void reset();
rpl_parallel_entry *find(uint32 domain_id);
- void wait_for_done();
+ void wait_for_done(THD *thd);
bool workers_idle();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);