summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel_retry.result62
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_retry.test91
-rw-r--r--sql/rpl_parallel.cc163
-rw-r--r--sql/rpl_parallel.h6
-rw-r--r--sql/rpl_rli.cc39
-rw-r--r--sql/rpl_rli.h36
-rw-r--r--sql/slave.cc14
-rw-r--r--sql/slave.h1
8 files changed, 394 insertions, 18 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_retry.result b/mysql-test/suite/rpl/r/rpl_parallel_retry.result
new file mode 100644
index 00000000000..352285a91bf
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_parallel_retry.result
@@ -0,0 +1,62 @@
+include/rpl_init.inc [topology=1->2]
+*** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1,1);
+SET sql_log_bin=0;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=5;
+include/start_slave.inc
+SET sql_log_bin=0;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+include/stop_slave.inc
+SET @old_format= @@SESSION.binlog_format;
+SET binlog_format='statement';
+SET gtid_seq_no = 100;
+BEGIN;
+INSERT INTO t1 VALUES (2,1);
+UPDATE t1 SET b=b+1 WHERE a=1;
+INSERT INTO t1 VALUES (3,1);
+COMMIT;
+SET binlog_format=@old_format;
+SELECT * FROM t1 ORDER BY a;
+a b
+1 2
+2 1
+3 1
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
+include/start_slave.inc
+SET GLOBAL debug_dbug=@old_dbug;
+retries
+1
+SELECT * FROM t1 ORDER BY a;
+a b
+1 2
+2 1
+3 1
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+DROP TABLE t1;
+DROP function foo;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_retry.test b/mysql-test/suite/rpl/t/rpl_parallel_retry.test
new file mode 100644
index 00000000000..edf00269737
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_parallel_retry.test
@@ -0,0 +1,91 @@
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--let $rpl_topology=1->2
+--source include/rpl_init.inc
+
+--echo *** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. ***
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1,1);
+--save_master_pos
+
+# Use a stored function to inject a debug_sync into the appropriate THD.
+# The function does nothing on the master, and on the slave it injects the
+# desired debug_sync action(s).
+SET sql_log_bin=0;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_2
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=5;
+--source include/start_slave.inc
+--sync_with_master
+SET sql_log_bin=0;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ IF d1 != '' THEN
+ SET debug_sync = d1;
+ END IF;
+ IF d2 != '' THEN
+ SET debug_sync = d2;
+ END IF;
+ RETURN x;
+ END
+||
+--delimiter ;
+--source include/stop_slave.inc
+
+--connection server_1
+SET @old_format= @@SESSION.binlog_format;
+SET binlog_format='statement';
+SET gtid_seq_no = 100;
+BEGIN;
+INSERT INTO t1 VALUES (2,1);
+UPDATE t1 SET b=b+1 WHERE a=1;
+#INSERT INTO t1 VALUES (3,foo(1,
+# "ha_write_row_end SIGNAL q1_ready WAIT_FOR q1_cont",
+# ""));
+INSERT INTO t1 VALUES (3,1);
+COMMIT;
+SET binlog_format=@old_format;
+SELECT * FROM t1 ORDER BY a;
+--save_master_pos
+
+--connection server_2
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
+let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
+--source include/start_slave.inc
+--sync_with_master
+SET GLOBAL debug_dbug=@old_dbug;
+let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
+--disable_query_log
+eval SELECT $new_retry - $old_retry AS retries;
+--enable_query_log
+
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+
+--connection server_1
+DROP TABLE t1;
+DROP function foo;
+
+--source include/rpl_end.inc
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 53769107661..f0147527957 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -7,15 +7,6 @@
/*
Code for optional parallel execution of replicated events on the slave.
-
- ToDo list:
-
- - Retry of failed transactions is not yet implemented for the parallel case.
-
- - All the waits (eg. in struct wait_for_commit and in
- rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
- everything needs to be correctly rolled back and stopped in all threads,
- to ensure a consistent slave replication state.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
@@ -197,6 +188,105 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
+static int
+retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
+{
+ /* ToDo */
+ return 0;
+}
+
+
+static int
+retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
+ rpl_parallel_thread::queued_event *orig_qev)
+{
+ IO_CACHE rlog;
+ File fd;
+ const char *errmsg= NULL;
+ inuse_relaylog *ir= rgi->relay_log;
+ uint64 event_count= 0;
+ uint64 events_to_execute= rgi->retry_event_count;
+ Relay_log_info *rli= rgi->rli;
+ int err= 0;
+ ulonglong cur_offset, old_offset;
+ char log_name[FN_REFLEN];
+ THD *thd= rgi->thd;
+
+do_retry:
+ rgi->cleanup_context(thd, 1);
+
+ mysql_mutex_lock(&rli->data_lock);
+ ++rli->retried_trans;
+ statistic_increment(slave_retried_transactions, LOCK_status);
+ mysql_mutex_unlock(&rli->data_lock);
+
+ strcpy(log_name, ir->name);
+ if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+ return 1;
+ cur_offset= rgi->retry_start_offset;
+ my_b_seek(&rlog, cur_offset);
+
+ do
+ {
+ Log_event_type event_type;
+ Log_event *ev;
+
+ old_offset= cur_offset;
+ ev= Log_event::read_log_event(&rlog, 0,
+ rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+ opt_slave_sql_verify_checksum);
+ cur_offset= my_b_tell(&rlog);
+
+ if (!ev)
+ {
+ err= 1;
+ goto err;
+ }
+ ev->thd= thd;
+ event_type= ev->get_type_code();
+ if (Log_event::is_group_event(event_type))
+ {
+ rpl_parallel_thread::queued_event *qev;
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+ cur_offset - old_offset);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if (!qev)
+ {
+ delete ev;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ err= 1;
+ goto err;
+ }
+ err= rpt_handle_event(qev, rpt);
+ ++event_count;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_qev(qev);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+ else
+ err= retry_handle_relay_log_rotate(ev, &rlog);
+ delete_or_keep_event_post_apply(rgi, event_type, ev);
+
+ if (err)
+ {
+ /* ToDo: Need to here also handle second retry. */
+ goto err;
+ }
+
+ // ToDo: handle too many retries.
+
+ } while (event_count < events_to_execute);
+
+err:
+
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ return err;
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -499,7 +589,23 @@ handle_rpl_parallel_thread(void *arg)
everything is stopped and cleaned up correctly.
*/
if (likely(!rgi->worker_error) && !skip_event_group)
+ {
+ ++rgi->retry_event_count;
err= rpt_handle_event(events, rpt);
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_1_100",
+ if (rgi->current_gtid.domain_id == 0 &&
+ rgi->current_gtid.server_id == 1 &&
+ rgi->current_gtid.seq_no == 100 &&
+ rgi->retry_event_count == 4)
+ {
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ err= 1;
+ };);
+ if (err && has_temporary_error(thd))
+ err= retry_event_group(rgi, rpt, events);
+ }
else
err= thd->wait_for_prior_commit();
@@ -802,8 +908,7 @@ err:
rpl_parallel_thread::queued_event *
-rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
- Relay_log_info *rli)
+rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
queued_event *qev;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -817,6 +922,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
+ return qev;
+}
+
+
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli)
+{
+ queued_event *qev= get_qev_common(ev, event_size);
+ if (!qev)
+ return NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
@@ -825,6 +941,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
}
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
+ const char *relay_log_name,
+ ulonglong event_pos, ulonglong event_size)
+{
+ queued_event *qev= get_qev_common(ev, event_size);
+ if (!qev)
+ return NULL;
+ qev->rgi= orig_qev->rgi;
+ strcpy(qev->event_relay_log_name, relay_log_name);
+ qev->event_relay_log_pos= event_pos;
+ qev->future_event_relay_log_pos= event_pos+event_size;
+ strcpy(qev->future_event_master_log_name,
+ orig_qev->future_event_master_log_name);
+ return qev;
+}
+
+
void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
@@ -836,7 +970,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
rpl_group_info*
rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
- rpl_parallel_entry *e)
+ rpl_parallel_entry *e, ulonglong event_size)
{
rpl_group_info *rgi;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -864,6 +998,9 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
return NULL;
}
rgi->parallel_entry= e;
+ rgi->relay_log= rli->last_inuse_relaylog;
+ rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
+ rgi->retry_event_count= 0;
return rgi;
}
@@ -1439,7 +1576,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
+ if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
cur_thread->free_qev(qev);
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 1808efd0926..3b6641523f6 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -106,11 +106,15 @@ struct rpl_parallel_thread {
queued_size-= dequeue_size;
}
+ queued_event *get_qev_common(Log_event *ev, ulonglong event_size);
queued_event *get_qev(Log_event *ev, ulonglong event_size,
Relay_log_info *rli);
+ queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
+ const char *relay_log_name,
+ ulonglong event_pos, ulonglong event_size);
void free_qev(queued_event *qev);
rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
- rpl_parallel_entry *e);
+ rpl_parallel_entry *e, ulonglong event_size);
void free_rgi(rpl_group_info *rgi);
group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
void free_gco(group_commit_orderer *gco);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index cc543f7c377..3a3e22f970a 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0), mi(0),
+ inuse_relaylog_list(0), last_inuse_relaylog(0),
cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0),
#if HAVE_valgrind
@@ -98,8 +99,17 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
Relay_log_info::~Relay_log_info()
{
+ inuse_relaylog *cur;
DBUG_ENTER("Relay_log_info::~Relay_log_info");
+ cur= inuse_relaylog_list;
+ while (cur)
+ {
+ DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
+ inuse_relaylog *next= cur->next;
+ my_free(cur);
+ cur= next;
+ }
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
@@ -1339,6 +1349,29 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
DBUG_VOID_RETURN;
}
+
+int
+Relay_log_info::alloc_inuse_relaylog(const char *name)
+{
+ inuse_relaylog *ir;
+
+ if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
+ return 1;
+ }
+ strcpy(ir->name, name);
+
+ if (!inuse_relaylog_list)
+ inuse_relaylog_list= ir;
+ else
+ last_inuse_relaylog->next= ir;
+ last_inuse_relaylog= ir;
+
+ return 0;
+}
+
+
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
@@ -1623,7 +1656,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
- DBUG_ENTER("Relay_log_info::cleanup_context");
+ DBUG_ENTER("rpl_group_info::cleanup_context");
DBUG_PRINT("enter", ("error: %d", (int) error));
DBUG_ASSERT(this->thd == thd);
@@ -1689,7 +1722,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
void rpl_group_info::clear_tables_to_lock()
{
- DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
+ DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
/**
When replicating in RBR and MyISAM Merge tables are involved
@@ -1736,7 +1769,7 @@ void rpl_group_info::clear_tables_to_lock()
void rpl_group_info::slave_close_thread_tables(THD *thd)
{
- DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
+ DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
thd->get_stmt_da()->set_overwrite_status(true);
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->get_stmt_da()->set_overwrite_status(false);
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 00d16f52488..c2cdbcdc573 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -61,6 +61,7 @@ enum {
*****************************************************************************/
struct rpl_group_info;
+struct inuse_relaylog;
class Relay_log_info : public Slave_reporting_capability
{
@@ -164,6 +165,13 @@ public:
Master_info *mi;
/*
+ List of active relay log files.
+ (This can be more than one in case of parallel replication).
+ */
+ inuse_relaylog *inuse_relaylog_list;
+ inuse_relaylog *last_inuse_relaylog;
+
+ /*
Needed to deal properly with cur_log getting closed and re-opened with
a different log under our feet
*/
@@ -398,6 +406,7 @@ public:
void stmt_done(my_off_t event_log_pos,
time_t event_creation_time, THD *thd,
rpl_group_info *rgi);
+ int alloc_inuse_relaylog(const char *name);
/**
Is the replication inside a group?
@@ -464,6 +473,25 @@ private:
/*
+ In parallel replication, if we need to re-try a transaction due to a
+ deadlock or other temporary error, we may need to go back and re-read events
+ out of an earlier relay log.
+
+ This structure keeps track of the relaylogs that are potentially in use.
+ Each rpl_group_info has a pointer to one of those, corresponding to the
+ first GTID event.
+
+ A reference count keeps track of how long a relay log is potentially in use.
+*/
+struct inuse_relaylog {
+ inuse_relaylog *next;
+ uint64 queued_count;
+ uint64 dequeued_count;
+ char name[FN_REFLEN];
+};
+
+
+/*
This is data for various state needed to be kept for the processing of
one event group (transaction) during replication.
@@ -596,6 +624,14 @@ struct rpl_group_info
/* Needs room for "Gtid D-S-N\x00". */
char gtid_info_buf[5+10+1+10+1+20+1];
+ /*
+ Information to be able to re-try an event group in case of a deadlock or
+ other temporary error.
+ */
+ inuse_relaylog *relay_log;
+ uint64 retry_start_offset;
+ uint64 retry_event_count;
+
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
void reinit(Relay_log_info *rli);
diff --git a/sql/slave.cc b/sql/slave.cc
index f755cb63558..ab505a4011f 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3094,7 +3094,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
that the error is temporary by pushing a warning with the error code
ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
*/
-static int has_temporary_error(THD *thd)
+int
+has_temporary_error(THD *thd)
{
DBUG_ENTER("has_temporary_error");
@@ -4478,6 +4479,9 @@ pthread_handler_t handle_slave_sql(void *arg)
"Error initializing relay log position: %s", errmsg);
goto err;
}
+ if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
+ goto err;
+
strcpy(rli->future_event_master_log_name, rli->group_master_log_name);
THD_CHECK_SENTRY(thd);
#ifndef DBUG_OFF
@@ -6521,6 +6525,12 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
mysql_mutex_unlock(log_lock);
goto err;
}
+ if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
+ {
+ if (!hot_log)
+ mysql_mutex_unlock(log_lock);
+ goto err;
+ }
if (!hot_log)
mysql_mutex_unlock(log_lock);
continue;
@@ -6536,6 +6546,8 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
&errmsg)) <0)
goto err;
+ if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name))
+ goto err;
}
else
{
diff --git a/sql/slave.h b/sql/slave.h
index 7352ac0274b..4b5bc1686fb 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -229,6 +229,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi);
int rotate_relay_log(Master_info* mi);
+int has_temporary_error(THD *thd);
int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi,
rpl_parallel_thread *rpt);