summaryrefslogtreecommitdiff
path: root/src/repmgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/repmgr')
-rw-r--r--src/repmgr/repmgr.msg44
-rw-r--r--src/repmgr/repmgr.src2
-rw-r--r--src/repmgr/repmgr_automsg.c206
-rw-r--r--src/repmgr/repmgr_elect.c214
-rw-r--r--src/repmgr/repmgr_method.c954
-rw-r--r--src/repmgr/repmgr_msg.c655
-rw-r--r--src/repmgr/repmgr_net.c186
-rw-r--r--src/repmgr/repmgr_posix.c2
-rw-r--r--src/repmgr/repmgr_queue.c132
-rw-r--r--src/repmgr/repmgr_rec.c10
-rw-r--r--src/repmgr/repmgr_sel.c726
-rw-r--r--src/repmgr/repmgr_stat.c74
-rw-r--r--src/repmgr/repmgr_stub.c74
-rw-r--r--src/repmgr/repmgr_util.c957
-rw-r--r--src/repmgr/repmgr_windows.c50
15 files changed, 3733 insertions, 553 deletions
diff --git a/src/repmgr/repmgr.msg b/src/repmgr/repmgr.msg
index 020f2e9c..ba544936 100644
--- a/src/repmgr/repmgr.msg
+++ b/src/repmgr/repmgr.msg
@@ -65,6 +65,11 @@ ARG port u_int16_t
END
BEGIN_MSG membership_data
+ARG status u_int32_t
+ARG flags u_int32_t
+END
+
+BEGIN_MSG v4membership_data
ARG flags u_int32_t
END
@@ -98,22 +103,51 @@ BEGIN_MSG membr_vers
ARG version u_int32_t
ARG gen u_int32_t
END
+
BEGIN_MSG site_info check_length
ARG host DBT
ARG port u_int16_t
+ARG status u_int32_t
+ARG flags u_int32_t
+END
+
+BEGIN_MSG v4site_info check_length
+ARG host DBT
+ARG port u_int16_t
ARG flags u_int32_t
END
/*
* If site A breaks or rejects a connection from site B, it first
* tries to send B this message containing site A's currently known
- * membership DB version. Site B can use this to decide what to do.
- * If site B knows of a later version, it should retry the connection
- * to site A later, polling at it until site A catches up. However, if
- * site B's known version is less, it means that site B is no longer in
- * the group, and so instead it should shut down and notify the application.
+ * membership DB version and site B's status in site A's membership DB.
+ * Site B can use them to decide what to do. If site B knows of a later
+ * version, it should retry the connection to site A later, polling
+ * until site A catches up. However, if site B's known version is
+ * less and site B's status is adding in site A's membership DB, it
+ * means that a badly-timed change of master may have caused the current
+ * master to lose B's membership DB update to present, so it should
+ * retry the connection to site A later, otherwise, site B is no longer
+ * in the group and it should shut down and notify the application.
*/
BEGIN_MSG connect_reject
ARG version u_int32_t
ARG gen u_int32_t
+ARG status u_int32_t
+END
+
+BEGIN_MSG v4connect_reject
+ARG version u_int32_t
+ARG gen u_int32_t
+END
+
+/*
+ * For preferred master LSN history comparison between the sites.
+ * The next_gen_lsn is [0,0] if the next generation doesn't yet exist.
+ */
+BEGIN_MSG lsnhist_match
+ARG lsn DB_LSN
+ARG hist_sec u_int32_t
+ARG hist_nsec u_int32_t
+ARG next_gen_lsn DB_LSN
END
diff --git a/src/repmgr/repmgr.src b/src/repmgr/repmgr.src
index 68d8c239..f42e159f 100644
--- a/src/repmgr/repmgr.src
+++ b/src/repmgr/repmgr.src
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2010, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2015 Oracle and/or its affiliates. All rights reserved.
*/
DBPRIVATE
diff --git a/src/repmgr/repmgr_automsg.c b/src/repmgr/repmgr_automsg.c
index 90af08ff..31bc4c35 100644
--- a/src/repmgr/repmgr_automsg.c
+++ b/src/repmgr/repmgr_automsg.c
@@ -463,6 +463,7 @@ __repmgr_membership_data_marshal(env, argp, bp)
__repmgr_membership_data_args *argp;
u_int8_t *bp;
{
+ DB_HTONL_COPYOUT(env, bp, argp->status);
DB_HTONL_COPYOUT(env, bp, argp->flags);
}
@@ -481,6 +482,7 @@ __repmgr_membership_data_unmarshal(env, argp, bp, max, nextp)
{
if (max < __REPMGR_MEMBERSHIP_DATA_SIZE)
goto too_few;
+ DB_NTOHL_COPYIN(env, argp->status, bp);
DB_NTOHL_COPYIN(env, argp->flags, bp);
if (nextp != NULL)
@@ -494,6 +496,46 @@ too_few:
}
/*
+ * PUBLIC: void __repmgr_v4membership_data_marshal __P((ENV *,
+ * PUBLIC: __repmgr_v4membership_data_args *, u_int8_t *));
+ */
+void
+__repmgr_v4membership_data_marshal(env, argp, bp)
+ ENV *env;
+ __repmgr_v4membership_data_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONL_COPYOUT(env, bp, argp->flags);
+}
+
+/*
+ * PUBLIC: int __repmgr_v4membership_data_unmarshal __P((ENV *,
+ * PUBLIC: __repmgr_v4membership_data_args *, u_int8_t *, size_t,
+ * PUBLIC: u_int8_t **));
+ */
+int
+__repmgr_v4membership_data_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __repmgr_v4membership_data_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ if (max < __REPMGR_V4MEMBERSHIP_DATA_SIZE)
+ goto too_few;
+ DB_NTOHL_COPYIN(env, argp->flags, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __repmgr_v4membership_data message"));
+ return (EINVAL);
+}
+
+/*
* PUBLIC: void __repmgr_member_metadata_marshal __P((ENV *,
* PUBLIC: __repmgr_member_metadata_args *, u_int8_t *));
*/
@@ -669,6 +711,7 @@ __repmgr_site_info_marshal(env, argp, bp, max, lenp)
bp += argp->host.size;
}
DB_HTONS_COPYOUT(env, bp, argp->port);
+ DB_HTONL_COPYOUT(env, bp, argp->status);
DB_HTONL_COPYOUT(env, bp, argp->flags);
*lenp = (size_t)(bp - start);
@@ -702,6 +745,7 @@ __repmgr_site_info_unmarshal(env, argp, bp, max, nextp)
goto too_few;
bp += argp->host.size;
DB_NTOHS_COPYIN(env, argp->port, bp);
+ DB_NTOHL_COPYIN(env, argp->status, bp);
DB_NTOHL_COPYIN(env, argp->flags, bp);
if (nextp != NULL)
@@ -715,6 +759,75 @@ too_few:
}
/*
+ * PUBLIC: int __repmgr_v4site_info_marshal __P((ENV *,
+ * PUBLIC: __repmgr_v4site_info_args *, u_int8_t *, size_t, size_t *));
+ */
+int
+__repmgr_v4site_info_marshal(env, argp, bp, max, lenp)
+ ENV *env;
+ __repmgr_v4site_info_args *argp;
+ u_int8_t *bp;
+ size_t *lenp, max;
+{
+ u_int8_t *start;
+
+ if (max < __REPMGR_V4SITE_INFO_SIZE
+ + (size_t)argp->host.size)
+ return (ENOMEM);
+ start = bp;
+
+ DB_HTONL_COPYOUT(env, bp, argp->host.size);
+ if (argp->host.size > 0) {
+ memcpy(bp, argp->host.data, argp->host.size);
+ bp += argp->host.size;
+ }
+ DB_HTONS_COPYOUT(env, bp, argp->port);
+ DB_HTONL_COPYOUT(env, bp, argp->flags);
+
+ *lenp = (size_t)(bp - start);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_v4site_info_unmarshal __P((ENV *,
+ * PUBLIC: __repmgr_v4site_info_args *, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__repmgr_v4site_info_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __repmgr_v4site_info_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ size_t needed;
+
+ needed = __REPMGR_V4SITE_INFO_SIZE;
+ if (max < needed)
+ goto too_few;
+ DB_NTOHL_COPYIN(env, argp->host.size, bp);
+ if (argp->host.size == 0)
+ argp->host.data = NULL;
+ else
+ argp->host.data = bp;
+ needed += (size_t)argp->host.size;
+ if (max < needed)
+ goto too_few;
+ bp += argp->host.size;
+ DB_NTOHS_COPYIN(env, argp->port, bp);
+ DB_NTOHL_COPYIN(env, argp->flags, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __repmgr_v4site_info message"));
+ return (EINVAL);
+}
+
+/*
* PUBLIC: void __repmgr_connect_reject_marshal __P((ENV *,
* PUBLIC: __repmgr_connect_reject_args *, u_int8_t *));
*/
@@ -726,6 +839,7 @@ __repmgr_connect_reject_marshal(env, argp, bp)
{
DB_HTONL_COPYOUT(env, bp, argp->version);
DB_HTONL_COPYOUT(env, bp, argp->gen);
+ DB_HTONL_COPYOUT(env, bp, argp->status);
}
/*
@@ -744,6 +858,7 @@ __repmgr_connect_reject_unmarshal(env, argp, bp, max, nextp)
goto too_few;
DB_NTOHL_COPYIN(env, argp->version, bp);
DB_NTOHL_COPYIN(env, argp->gen, bp);
+ DB_NTOHL_COPYIN(env, argp->status, bp);
if (nextp != NULL)
*nextp = bp;
@@ -755,3 +870,94 @@ too_few:
return (EINVAL);
}
+/*
+ * PUBLIC: void __repmgr_v4connect_reject_marshal __P((ENV *,
+ * PUBLIC: __repmgr_v4connect_reject_args *, u_int8_t *));
+ */
+void
+__repmgr_v4connect_reject_marshal(env, argp, bp)
+ ENV *env;
+ __repmgr_v4connect_reject_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONL_COPYOUT(env, bp, argp->version);
+ DB_HTONL_COPYOUT(env, bp, argp->gen);
+}
+
+/*
+ * PUBLIC: int __repmgr_v4connect_reject_unmarshal __P((ENV *,
+ * PUBLIC: __repmgr_v4connect_reject_args *, u_int8_t *, size_t,
+ * PUBLIC: u_int8_t **));
+ */
+int
+__repmgr_v4connect_reject_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __repmgr_v4connect_reject_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ if (max < __REPMGR_V4CONNECT_REJECT_SIZE)
+ goto too_few;
+ DB_NTOHL_COPYIN(env, argp->version, bp);
+ DB_NTOHL_COPYIN(env, argp->gen, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __repmgr_v4connect_reject message"));
+ return (EINVAL);
+}
+
+/*
+ * PUBLIC: void __repmgr_lsnhist_match_marshal __P((ENV *,
+ * PUBLIC: __repmgr_lsnhist_match_args *, u_int8_t *));
+ */
+void
+__repmgr_lsnhist_match_marshal(env, argp, bp)
+ ENV *env;
+ __repmgr_lsnhist_match_args *argp;
+ u_int8_t *bp;
+{
+ DB_HTONL_COPYOUT(env, bp, argp->lsn.file);
+ DB_HTONL_COPYOUT(env, bp, argp->lsn.offset);
+ DB_HTONL_COPYOUT(env, bp, argp->hist_sec);
+ DB_HTONL_COPYOUT(env, bp, argp->hist_nsec);
+ DB_HTONL_COPYOUT(env, bp, argp->next_gen_lsn.file);
+ DB_HTONL_COPYOUT(env, bp, argp->next_gen_lsn.offset);
+}
+
+/*
+ * PUBLIC: int __repmgr_lsnhist_match_unmarshal __P((ENV *,
+ * PUBLIC: __repmgr_lsnhist_match_args *, u_int8_t *, size_t, u_int8_t **));
+ */
+int
+__repmgr_lsnhist_match_unmarshal(env, argp, bp, max, nextp)
+ ENV *env;
+ __repmgr_lsnhist_match_args *argp;
+ u_int8_t *bp;
+ size_t max;
+ u_int8_t **nextp;
+{
+ if (max < __REPMGR_LSNHIST_MATCH_SIZE)
+ goto too_few;
+ DB_NTOHL_COPYIN(env, argp->lsn.file, bp);
+ DB_NTOHL_COPYIN(env, argp->lsn.offset, bp);
+ DB_NTOHL_COPYIN(env, argp->hist_sec, bp);
+ DB_NTOHL_COPYIN(env, argp->hist_nsec, bp);
+ DB_NTOHL_COPYIN(env, argp->next_gen_lsn.file, bp);
+ DB_NTOHL_COPYIN(env, argp->next_gen_lsn.offset, bp);
+
+ if (nextp != NULL)
+ *nextp = bp;
+ return (0);
+
+too_few:
+ __db_errx(env, DB_STR("3675",
+ "Not enough input bytes to fill a __repmgr_lsnhist_match message"));
+ return (EINVAL);
+}
+
diff --git a/src/repmgr/repmgr_elect.c b/src/repmgr/repmgr_elect.c
index 3a84694a..15a2de7b 100644
--- a/src/repmgr/repmgr_elect.c
+++ b/src/repmgr/repmgr_elect.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -12,9 +12,9 @@
static db_timeout_t __repmgr_compute_response_time __P((ENV *));
static int __repmgr_elect __P((ENV *, u_int32_t, db_timespec *));
-static int __repmgr_elect_main __P((ENV *, REPMGR_RUNNABLE *));
+static int __repmgr_elect_main __P((ENV *,
+ DB_THREAD_INFO *, REPMGR_RUNNABLE *));
static void *__repmgr_elect_thread __P((void *));
-static int send_membership __P((ENV *));
/*
* Starts an election thread.
@@ -90,26 +90,39 @@ __repmgr_elect_thread(argsp)
{
REPMGR_RUNNABLE *th;
ENV *env;
+ DB_THREAD_INFO *ip;
int ret;
th = argsp;
env = th->env;
+ ip = NULL;
+ ret = 0;
- RPRINT(env, (env, DB_VERB_REPMGR_MISC, "starting election thread"));
+ ENV_ENTER_RET(env, ip, ret);
+ if (ret == 0)
+ RPRINT(env, (env,
+ DB_VERB_REPMGR_MISC, "starting election thread"));
- if ((ret = __repmgr_elect_main(env, th)) != 0) {
+ if (ret != 0 || (ret = __repmgr_elect_main(env, ip, th)) != 0) {
__db_err(env, ret, "election thread failed");
+ RPRINT(env, (env,
+ DB_VERB_REPMGR_MISC, "election thread is exiting"));
+ ENV_LEAVE(env, ip);
(void)__repmgr_thread_failure(env, ret);
}
-
- RPRINT(env, (env, DB_VERB_REPMGR_MISC, "election thread is exiting"));
+ if (ret == 0) {
+ RPRINT(env, (env,
+ DB_VERB_REPMGR_MISC, "election thread is exiting"));
+ ENV_LEAVE(env, ip);
+ }
th->finished = TRUE;
return (NULL);
}
static int
-__repmgr_elect_main(env, th)
+__repmgr_elect_main(env, ip, th)
ENV *env;
+ DB_THREAD_INFO *ip;
REPMGR_RUNNABLE *th;
{
DB_REP *db_rep;
@@ -123,10 +136,13 @@ __repmgr_elect_main(env, th)
db_timespec failtime, now, repstart_time, target, wait_til;
db_timeout_t delay_time, response_time, tmp_time;
u_long sec, usec;
- u_int32_t flags;
- int done_repstart, ret, suppress_election;
+ u_int32_t flags, max_tries, tries;
+ int client_detected, done_repstart, lsnhist_match, master_detected;
+ int ret, suppress_election;
enum { ELECTION, REPSTART } action;
+ COMPQUIET(usec, 0);
+ COMPQUIET(max_tries, 0);
COMPQUIET(action, ELECTION);
db_rep = env->rep_handle;
@@ -181,6 +197,120 @@ __repmgr_elect_main(env, th)
UNLOCK_MUTEX(db_rep->mutex);
/*
+ * In preferred master mode, the select thread signals when a
+ * client has lost its connection to the master via prefmas_pending,
+ * but the actual restart as temporary master is done here in an
+ * election thread.
+ */
+ if (IS_PREFMAS_MODE(env) && F_ISSET(rep, REP_F_CLIENT) &&
+ db_rep->prefmas_pending == start_temp_master) {
+ db_rep->prefmas_pending = no_action;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "elect_main preferred master restart temp master"));
+ ret = __repmgr_become_master(env, 0);
+ goto out;
+ }
+
+ /* Get preferred master wait limits for detecting the other site. */
+ if (IS_PREFMAS_MODE(env) &&
+ (ret = __repmgr_prefmas_get_wait(env, &max_tries, &usec)) != 0)
+ goto out;
+
+ /* Preferred master mode master site start-up. */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER) &&
+ LF_ISSET(ELECT_F_STARTUP)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "elect_main preferred master site startup"));
+ client_detected = FALSE;
+ lsnhist_match = FALSE;
+ tries = 0;
+ while (!client_detected && tries < max_tries) {
+ __os_yield(env, 0, usec);
+ tries++;
+ client_detected = __repmgr_prefmas_connected(env);
+ }
+ if (client_detected) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "elect_main preferred master client detected"));
+ /*
+ * Restart remote site as a client. Depending on the
+ * outcome of lsnhist_match below, this site will
+ * either restart as master or it will start an
+ * election. In either case, the remote site should
+ * be running as a client.
+ *
+ * Then perform the lsnhist_match comparison.
+ */
+ if ((ret = __repmgr_restart_site_as_client(
+ env, 1)) != 0 ||
+ (ret = __repmgr_lsnhist_match(env,
+ ip, 1, &lsnhist_match)) != 0)
+ goto out;
+ /*
+ * An lsnhist_match means that we have a continuous
+ * set of transactions and it is safe to call a
+ * comparison election to preserve any temporary master
+ * transactions that were committed while this site
+ * was down.
+ */
+ if (lsnhist_match) {
+ F_CLR(rep, REP_F_HOLD_GEN);
+ LF_SET(ELECT_F_IMMED);
+ LF_CLR(ELECT_F_STARTUP);
+ /* Continue on to election code below. */
+ }
+ }
+ /*
+ * If we didn't detect a client within a reasonable time or
+ * we failed the lsnhist_match (meaning we have conflicting
+ * sets of transactions), we start this site as a master and
+ * possibly force rollback of temporary master transactions.
+ */
+ if (!client_detected || !lsnhist_match) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "elect_main preferred master site start master"));
+ ret = __repmgr_become_master(env, 0);
+ F_CLR(rep, REP_F_HOLD_GEN);
+ goto out;
+ }
+ }
+
+ /* Preferred master mode client site start-up. */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) &&
+ LF_ISSET(ELECT_F_STARTUP)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "elect_main preferred master client site startup"));
+ master_detected = FALSE;
+ tries = 0;
+ while (!master_detected && tries < max_tries) {
+ __os_yield(env, 0, usec);
+ tries++;
+ master_detected = __repmgr_prefmas_connected(env);
+ }
+ /*
+ * If we find the master, restart as client here so that we
+ * send a newclient message after we are connected to the
+ * master. The master will send a newmaster message so that
+ * we can start the client sync process.
+ *
+ * If we haven't found the master after the timeout, start as
+ * temporary master.
+ */
+ if (master_detected) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "elect_main preferred master detected"));
+ ret = __repmgr_become_client(env);
+ } else {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "elect_main preferred master client start master"));
+ ret = __repmgr_become_master(env, 0);
+ }
+ goto out;
+ }
+
+ /*
* The 'done_repstart' flag keeps track of which was our most recent
* operation (repstart or election), so that we can alternate
* appropriately. There are a few different ways this thread can be
@@ -188,7 +318,7 @@ __repmgr_elect_main(env, th)
* called. The one exception is at initial start-up, where we
* first probe for a master by sending out rep_start(CLIENT) calls.
*/
- if (LF_ISSET(ELECT_F_IMMED)) {
+ if (LF_ISSET(ELECT_F_IMMED) && !IS_VIEW_SITE(env)) {
/*
* When the election succeeds, we've successfully completed
* everything we need to do. If it fails in an unexpected way,
@@ -256,11 +386,13 @@ __repmgr_elect_main(env, th)
/*
* See if it's time to retry the operation. Normally it's an
* election we're interested in retrying. But we refrain from
- * calling for elections if so configured.
+ * calling for elections if so configured or we are a view.
*/
- suppress_election = LF_ISSET(ELECT_F_STARTUP) ?
+ suppress_election = IS_VIEW_SITE(env) ||
+ (LF_ISSET(ELECT_F_STARTUP) ?
db_rep->init_policy == DB_REP_CLIENT :
- !FLD_ISSET(rep->config, REP_C_ELECTIONS);
+ !FLD_ISSET(rep->config, REP_C_ELECTIONS)) ||
+ LF_ISSET(ELECT_F_CLIENT_RESTART);
repstart_time = db_rep->repstart_time;
target = suppress_election ? repstart_time : failtime;
TIMESPEC_ADD_DB_TIMEOUT(&target, rep->election_retry_wait);
@@ -343,7 +475,8 @@ __repmgr_elect_main(env, th)
DB_ASSERT(env, action == REPSTART);
db_rep->new_connection = FALSE;
- if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0)
+ if ((ret = __repmgr_repstart(env,
+ DB_REP_CLIENT, 0)) != 0)
goto out;
done_repstart = TRUE;
@@ -476,7 +609,20 @@ __repmgr_elect(env, flags, failtimep)
case DB_REP_UNAVAIL:
__os_gettime(env, failtimep, 1);
DB_EVENT(env, DB_EVENT_REP_ELECTION_FAILED, NULL);
- if ((t_ret = send_membership(env)) != 0)
+ /*
+ * If an election fails with DB_REP_UNAVAIL, it could be
+ * because a participating site has an obsolete, too-high
+ * notion of the group size. (This could happen if the site
+ * was down/disconnected during removal of some (other) sites.)
+ * To remedy this, broadcast a current copy of the membership
+ * list. Since all sites are doing this, and we always ratchet
+ * to the most up-to-date version, this should bring all sites
+ * up to date. We only do this after a failure, during what
+ * will normally be an idle period anyway, so that we don't
+ * slow down a first election following the loss of an active
+ * master.
+ */
+ if ((t_ret = __repmgr_bcast_member_list(env)) != 0)
ret = t_ret;
break;
@@ -498,40 +644,6 @@ __repmgr_elect(env, flags, failtimep)
}
/*
- * If an election fails with DB_REP_UNAVAIL, it could be because a participating
- * site has an obsolete, too-high notion of the group size. (This could happen
- * if the site was down/disconnected during removal of some (other) sites.) To
- * remedy this, broadcast a current copy of the membership list. Since all
- * sites are doing this, and we always ratchet to the most up-to-date version,
- * this should bring all sites up to date. We only do this after a failure,
- * during what will normally be an idle period anyway, so that we don't slow
- * down a first election following the loss of an active master.
- */
-static int
-send_membership(env)
- ENV *env;
-{
- DB_REP *db_rep;
- u_int8_t *buf;
- size_t len;
- int ret;
-
- db_rep = env->rep_handle;
- buf = NULL;
- LOCK_MUTEX(db_rep->mutex);
- if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) != 0)
- goto out;
- RPRINT(env, (env, DB_VERB_REPMGR_MISC,
- "Broadcast latest membership list"));
- ret = __repmgr_bcast_own_msg(env, REPMGR_SHARING, buf, len);
-out:
- UNLOCK_MUTEX(db_rep->mutex);
- if (buf != NULL)
- __os_free(env, buf);
- return (ret);
-}
-
-/*
* Becomes master after we've won an election, if we can.
*
* PUBLIC: int __repmgr_claim_victory __P((ENV *));
@@ -543,7 +655,7 @@ __repmgr_claim_victory(env)
int ret;
env->rep_handle->takeover_pending = FALSE;
- if ((ret = __repmgr_become_master(env)) == DB_REP_UNAVAIL) {
+ if ((ret = __repmgr_become_master(env, 0)) == DB_REP_UNAVAIL) {
ret = 0;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Won election but lost race with DUPMASTER client intent"));
diff --git a/src/repmgr/repmgr_method.c b/src/repmgr/repmgr_method.c
index 229cf650..729ba5ff 100644
--- a/src/repmgr/repmgr_method.c
+++ b/src/repmgr/repmgr_method.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -29,19 +29,17 @@ static int get_channel_connection __P((CHANNEL *, REPMGR_CONNECTION **));
static int init_dbsite __P((ENV *, int, const char *, u_int, DB_SITE **));
static int join_group_at_site __P((ENV *, repmgr_netaddr_t *));
static int kick_blockers __P((ENV *, REPMGR_CONNECTION *, void *));
-static int make_request_conn __P((ENV *,
- repmgr_netaddr_t *, REPMGR_CONNECTION **));
static int set_local_site __P((DB_SITE *, u_int32_t));
-static int read_own_msg __P((ENV *,
- REPMGR_CONNECTION *, u_int32_t *, u_int8_t **, size_t *));
static int refresh_site __P((DB_SITE *));
static int __repmgr_await_threads __P((ENV *));
static int __repmgr_build_data_out __P((ENV *,
DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp));
static int __repmgr_build_msg_out __P((ENV *,
DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp));
+static int __repmgr_demote_site(ENV *, int);
static int repmgr_only __P((ENV *, const char *));
static int __repmgr_restart __P((ENV *, int, u_int32_t));
+static int __repmgr_remove_and_close_site __P((DB_SITE *));
static int __repmgr_remove_site __P((DB_SITE *));
static int __repmgr_remove_site_pp __P((DB_SITE *));
static int __repmgr_start_msg_threads __P((ENV *, u_int));
@@ -52,25 +50,21 @@ static int send_msg_self __P((ENV *, REPMGR_IOVECS *, u_int32_t));
static int site_by_addr __P((ENV *, const char *, u_int, DB_SITE **));
/*
- * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
+ * PUBLIC: int __repmgr_start_pp __P((DB_ENV *, int, u_int32_t));
*/
int
-__repmgr_start(dbenv, nthreads, flags)
+__repmgr_start_pp(dbenv, nthreads, flags)
DB_ENV *dbenv;
int nthreads;
u_int32_t flags;
{
DB_REP *db_rep;
- REP *rep;
- REPMGR_SITE *me, *site;
- DB_THREAD_INFO *ip;
ENV *env;
- int first, is_listener, locked, min, need_masterseek, ret, start_master;
- u_int i, n;
+ DB_THREAD_INFO *ip;
+ int ret;
env = dbenv->env;
db_rep = env->rep_handle;
- rep = db_rep->region;
switch (flags) {
case 0:
@@ -102,7 +96,27 @@ __repmgr_start(dbenv, nthreads, flags)
return (EINVAL);
}
- /* Check if it is a shut-down site, if so, clean the resources. */
+ /* A view site cannot be started as MASTER or ELECTION. */
+ if (IS_VIEW_SITE(env) &&
+ (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) {
+ __db_errx(env, DB_STR("3694",
+ "A view site must be started with DB_REP_CLIENT"));
+ return (EINVAL);
+ }
+
+ /* Must start site as client in preferred master mode. */
+ if (PREFMAS_IS_SET(env) &&
+ (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) {
+ __db_errx(env, DB_STR("3702",
+ "A preferred master site must be started with "
+ "DB_REP_CLIENT"));
+ return (EINVAL);
+ }
+
+ /*
+ * Check if it is a shut-down site, if so, clean the resources and
+ * reset the status in order to get ready to start replication.
+ */
if (db_rep->repmgr_status == stopped) {
if ((ret = __repmgr_stop(env)) != 0) {
__db_errx(env, DB_STR("3638",
@@ -112,7 +126,55 @@ __repmgr_start(dbenv, nthreads, flags)
db_rep->repmgr_status = ready;
}
+ /* Record the original configurations given by application. */
+ ENV_ENTER(env, ip);
db_rep->init_policy = flags;
+ db_rep->config_nthreads = nthreads;
+ ret = __repmgr_start_int(env, nthreads, flags);
+ ENV_LEAVE(env, ip);
+ return (ret);
+}
+
+/*
+ * Internal processing to start replication manager.
+ *
+ * PUBLIC: int __repmgr_start_int __P((ENV *, int, u_int32_t));
+ */
+int
+__repmgr_start_int(env, nthreads, flags)
+ ENV *env;
+ int nthreads;
+ u_int32_t flags;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ LOG *lp;
+ REP *rep;
+ REPMGR_SITE *me, *site;
+ u_int32_t startopts;
+ int first, flags_error, is_listener, locked, min;
+ int need_masterseek, ret, start_master;
+ u_int i, n;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ flags_error = 0;
+ startopts = 0;
+
+ /*
+ * For preferred master master site startup, we need to save the
+ * log location at the end of our previous transactions for
+ * the lsnhist_match comparisons. Starting repmgr adds a few
+ * more log records that we don't want to count in lsnhist_match.
+ */
+ if (FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) {
+ LOG_SYSTEM_LOCK(env);
+ db_rep->prefmas_init_lsn = lp->lsn;
+ LOG_SYSTEM_UNLOCK(env);
+ }
+
if ((ret = __rep_set_transport_int(env,
db_rep->self_eid, __repmgr_send)) != 0)
return (ret);
@@ -128,7 +190,8 @@ __repmgr_start(dbenv, nthreads, flags)
if (db_rep->restored_list != NULL) {
ret = __repmgr_refresh_membership(env,
- db_rep->restored_list, db_rep->restored_list_length);
+ db_rep->restored_list, db_rep->restored_list_length,
+ DB_REPMGR_VERSION);
__os_free(env, db_rep->restored_list);
db_rep->restored_list = NULL;
} else {
@@ -145,9 +208,15 @@ __repmgr_start(dbenv, nthreads, flags)
* join.
*/
ret = __repmgr_join_group(env);
+ else if (VIEW_TO_PARTICIPANT(db_rep, me)) {
+ __db_errx(env, DB_STR("3695",
+ "A view site must be started with a view callback"));
+ return (EINVAL);
+ }
} else if (ret == ENOENT) {
- ENV_ENTER(env, ip);
- if (FLD_ISSET(me->config, DB_GROUP_CREATOR))
+ if (FLD_ISSET(me->config, DB_GROUP_CREATOR) ||
+ (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)))
start_master = TRUE;
/*
* LEGACY is inconsistent with CREATOR, but start_master
@@ -166,10 +235,12 @@ __repmgr_start(dbenv, nthreads, flags)
continue;
if ((ret = __repmgr_set_membership(env,
site->net_addr.host,
- site->net_addr.port,
- SITE_PRESENT)) != 0)
+ site->net_addr.port, SITE_PRESENT,
+ site->gmdb_flags)) != 0)
break;
- n++;
+ if (!FLD_ISSET(site->gmdb_flags,
+ SITE_VIEW))
+ n++;
}
ret = __rep_set_nsites_int(env, n);
DB_ASSERT(env, ret == 0);
@@ -180,30 +251,27 @@ __repmgr_start(dbenv, nthreads, flags)
db_rep->member_version_gen = 1;
if ((ret = __repmgr_set_membership(env,
me->net_addr.host, me->net_addr.port,
- SITE_PRESENT)) == 0) {
+ SITE_PRESENT, 0)) == 0) {
ret = __rep_set_nsites_int(env, 1);
DB_ASSERT(env, ret == 0);
}
UNLOCK_MUTEX(db_rep->mutex);
} else
ret = __repmgr_join_group(env);
- ENV_LEAVE(env, ip);
} else if (ret == DB_DELETED)
ret = DB_REP_UNAVAIL;
}
if (ret != 0)
return (ret);
- DB_ASSERT(env, start_master ||
- SITE_FROM_EID(db_rep->self_eid)->membership == SITE_PRESENT);
-
/*
- * If we're the first repmgr_start() call, we will have to start threads.
- * Therefore, we require a flags value (to tell us how).
+ * Catch case where user defines a different local site address than
+ * the one in the restored_list from an ongoing internal init.
*/
- if (db_rep->repmgr_status != running && flags == 0) {
- __db_errx(env, DB_STR("3639",
- "a non-zero flags value is required for initial repmgr_start() call"));
+ if (!start_master &&
+ SITE_FROM_EID(db_rep->self_eid)->membership != SITE_PRESENT) {
+ __db_errx(env, DB_STR("3696",
+ "Current local site conflicts with earlier definition"));
return (EINVAL);
}
@@ -214,37 +282,54 @@ __repmgr_start(dbenv, nthreads, flags)
*
* Then, in case there could be multiple processes, we're either the
* main listener process or a subordinate process. On a "subsequent"
- * repmgr_start() call we already have enough information to know which
- * it is. Otherwise, negotiate with information in the shared region to
- * claim the listener role if possible.
+ * repmgr_start() call, with a running main listener process, we already
+ * have enough information to know which it is. Otherwise, if there is
+ * no listener, negotiate with information in the shared region to claim
+ * the listener role if possible. Once we decide we're the listener,
+ * mark the listener id in the shared region, so that no other process
+ * thinks the same thing.
*
* To avoid a race, once we decide we're in the first call, mark the
* handle as started, so that no other thread thinks the same thing.
*/
+ first = FALSE;
+ is_listener = FALSE;
LOCK_MUTEX(db_rep->mutex);
locked = TRUE;
- if (db_rep->repmgr_status == running) {
- first = FALSE;
+ if (db_rep->repmgr_status == running && !(rep->listener == 0 &&
+ FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER)))
is_listener = !IS_SUBORDINATE(db_rep);
- } else {
+ else if (db_rep->repmgr_status != running &&
+ rep->listener == 0 && flags == 0)
+ flags_error = 1;
+ else {
first = TRUE;
db_rep->repmgr_status = running;
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
if (rep->listener == 0) {
is_listener = TRUE;
- __os_id(dbenv, &rep->listener, NULL);
- } else {
- is_listener = FALSE;
+ __os_id(env->dbenv, &rep->listener, NULL);
+ } else
nthreads = 0;
- }
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
}
UNLOCK_MUTEX(db_rep->mutex);
locked = FALSE;
+ /*
+ * The first repmgr_start() call for the main listener process
+ * requires a flags value to tell us how to start up the site.
+ * But we don't require a flags value for the repmgr_start()
+ * call for a subordinate process because the site is already
+ * started and we would only ignore the value anyway.
+ */
+ if (flags_error) {
+ __db_errx(env, DB_STR("3639",
+ "A non-zero flags value is required for initial repmgr_start() call"));
+ return (EINVAL);
+ }
+
if (!first) {
/*
* Subsequent call is allowed when ELECTIONS are turned off, so
@@ -266,7 +351,7 @@ __repmgr_start(dbenv, nthreads, flags)
/*
* The minimum legal number of threads is either 1 or 0, depending upon
- * whether we're the main process or a subordinate.
+ * whether we're the listener process or a subordinate.
*/
min = is_listener ? 1 : 0;
if (nthreads < min) {
@@ -303,14 +388,24 @@ __repmgr_start(dbenv, nthreads, flags)
* of rep_start calls even within an env region lifetime.
*/
if (start_master) {
- ret = __repmgr_become_master(env);
+ ret = __repmgr_become_master(env, 0);
/* No other repmgr threads running yet. */
DB_ASSERT(env, ret != DB_REP_UNAVAIL);
if (ret != 0)
goto err;
need_masterseek = FALSE;
} else {
- if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0)
+ /*
+ * The preferred master site cannot allow its gen
+ * to change until it has done its lsnhist_match to
+ * guarantee that no preferred master transactions
+ * will be rolled back.
+ */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER))
+ startopts = REP_START_HOLD_CLIGEN;
+ if ((ret = __repmgr_repstart(env,
+ DB_REP_CLIENT, startopts)) != 0)
goto err;
/*
* The repmgr election code starts elections only if
@@ -352,6 +447,7 @@ __repmgr_start(dbenv, nthreads, flags)
if ((ret =
__repmgr_start_msg_threads(env, (u_int)nthreads)) != 0)
goto err;
+ rep->listener_nthreads = (u_int)nthreads;
if (need_masterseek) {
/*
@@ -374,10 +470,47 @@ __repmgr_start(dbenv, nthreads, flags)
}
UNLOCK_MUTEX(db_rep->mutex);
locked = FALSE;
+ /*
+ * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only
+ * do this for the main listener process. For a subordinate
+ * process, it is always turned on.
+ */
+ rep->inqueue_full_event_on = 1;
+ }
+ if (db_rep->selector == NULL) {
+ /* All processes (even non-listeners) need a select() thread. */
+ if ((ret = __repmgr_start_selector(env)) == 0) {
+ /*
+ * A view callback is set but this site isn't yet a
+ * view in the internal site list. Do the view
+ * demotion here, which will update the internal
+ * site list. We need the select() thread for the
+ * demotion because the demotion performs gmdb
+ * operations.
+ */
+ if (PARTICIPANT_TO_VIEW(db_rep,
+ SITE_FROM_EID(db_rep->self_eid)) &&
+ (ret = __repmgr_demote_site(env,
+ db_rep->self_eid)) != 0)
+ goto err;
+ return (is_listener ? 0 : DB_REP_IGNORE);
+ }
+ } else {
+ /*
+ * If the selector thread already exists, the current process
+ * should be the new listener which has just finished a
+ * takeover. Now, all active connections need to be refreshed
+ * to notify remote sites about the new listener. If a new
+ * connection is established immediately, disable the existing
+ * main connection to the same site. Otherwise, schedule a
+ * second immediate attempt. If it still fails, disable the
+ * main connection and retry a connection as usual.
+ */
+ DB_ASSERT(env, is_listener &&
+ FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER));
+ if ((ret = __repmgr_refresh_selector(env)) == 0)
+ return (0);
}
- /* All processes (even non-listeners) need a select() thread. */
- if ((ret = __repmgr_start_selector(env)) == 0)
- return (is_listener ? 0 : DB_REP_IGNORE);
err:
/* If we couldn't succeed at everything, undo the parts we did do. */
@@ -392,6 +525,16 @@ err:
if (!locked)
LOCK_MUTEX(db_rep->mutex);
(void)__repmgr_net_close(env);
+ /* Reset the listener when we fail before having a valid listen_fd. */
+ if (first && is_listener)
+ rep->listener = 0;
+ /*
+ * Reset repmgr_status when we fail before starting a selector if the
+ * earlier call to __repmgr_stop_threads() hasn't already reset it to
+ * stopped.
+ */
+ if (db_rep->repmgr_status == running)
+ db_rep->repmgr_status = ready;
UNLOCK_MUTEX(db_rep->mutex);
return (ret);
}
@@ -425,6 +568,53 @@ __repmgr_valid_config(env, flags)
}
/*
+ * Set priority, heartbeat and election_retry timeouts for preferred master
+ * mode. Turn on 2SITE_STRICT and ELECTIONS. Can be called whether or not
+ * REP_ON() is true
+ *
+ * PUBLIC: int __repmgr_prefmas_auto_config __P((DB_ENV *, u_int32_t *));
+ */
+int __repmgr_prefmas_auto_config (dbenv, config_flags)
+ DB_ENV *dbenv;
+ u_int32_t *config_flags;
+{
+ ENV * env;
+ db_timeout_t timeout;
+ int ret;
+
+ env = dbenv->env;
+ timeout = 0;
+
+ /* Change heartbeat timeouts if they are not already set. */
+ if ((ret = __rep_get_timeout(dbenv,
+ DB_REP_HEARTBEAT_MONITOR, &timeout)) == 0 &&
+ timeout == 0 && (ret = __rep_set_timeout_int(env,
+ DB_REP_HEARTBEAT_MONITOR,
+ DB_REPMGR_PREFMAS_HEARTBEAT_MONITOR)) != 0)
+ return (ret);
+ if ((ret = __rep_get_timeout(dbenv,
+ DB_REP_HEARTBEAT_SEND, &timeout)) == 0 &&
+ timeout == 0 && (ret = __rep_set_timeout_int(env,
+ DB_REP_HEARTBEAT_SEND, DB_REPMGR_PREFMAS_HEARTBEAT_SEND)) != 0)
+ return (ret);
+
+ /* Change election_retry timeout if it is still the default value. */
+ if ((ret = __rep_get_timeout(dbenv,
+ DB_REP_ELECTION_RETRY, &timeout)) == 0 &&
+ timeout == DB_REPMGR_DEFAULT_ELECTION_RETRY &&
+ (ret = __rep_set_timeout_int(env,
+ DB_REP_ELECTION_RETRY, DB_REPMGR_PREFMAS_ELECTION_RETRY)) != 0)
+ return (ret);
+
+ if ((ret = __rep_set_priority_int(env, FLD_ISSET(*config_flags,
+ REP_C_PREFMAS_MASTER) ? DB_REPMGR_PREFMAS_PRIORITY_MASTER :
+ DB_REPMGR_PREFMAS_PRIORITY_CLIENT)) != 0)
+ return (ret);
+ FLD_SET(*config_flags, REP_C_ELECTIONS | REP_C_2SITE_STRICT);
+ return (0);
+}
+
+/*
* Starts message processing threads. On entry, the actual number of threads
* already active is db_rep->nthreads; the desired number of threads is passed
* as "n".
@@ -473,7 +663,7 @@ __repmgr_restart(env, nthreads, flags)
REP *rep;
REPMGR_RUNNABLE **th;
u_int32_t cur_repflags;
- int locked, ret, t_ret;
+ int locked, ret, role_change, t_ret;
u_int delta, i, min, nth;
th = NULL;
@@ -491,6 +681,7 @@ __repmgr_restart(env, nthreads, flags)
}
ret = 0;
+ role_change = 0;
db_rep = env->rep_handle;
DB_ASSERT(env, REP_ON(env));
rep = db_rep->region;
@@ -498,11 +689,14 @@ __repmgr_restart(env, nthreads, flags)
cur_repflags = F_ISSET(rep, REP_F_MASTER | REP_F_CLIENT);
DB_ASSERT(env, cur_repflags);
if (FLD_ISSET(cur_repflags, REP_F_MASTER) &&
- flags == DB_REP_CLIENT)
+ flags == DB_REP_CLIENT) {
ret = __repmgr_become_client(env);
- else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) &&
- flags == DB_REP_MASTER)
- ret = __repmgr_become_master(env);
+ role_change = 1;
+ } else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) &&
+ flags == DB_REP_MASTER) {
+ ret = __repmgr_become_master(env, 0);
+ role_change = 1;
+ }
if (ret != 0)
return (ret);
@@ -574,6 +768,9 @@ __repmgr_restart(env, nthreads, flags)
}
__os_free(env, th);
}
+ /* We will always turn on the inqueue full event after role change. */
+ if (role_change)
+ rep->inqueue_full_event_on = 1;
out: if (locked)
UNLOCK_MUTEX(db_rep->mutex);
@@ -668,7 +865,8 @@ __repmgr_start_selector(env)
* PUBLIC: int __repmgr_close __P((ENV *));
*
* Close repmgr during env close. It stops repmgr, frees sites array and
- * its addresses.
+ * its addresses. Note that it is possible for the sites array to exist
+ * and require deallocation independently of whether repmgr was started.
*/
int
__repmgr_close(env)
@@ -679,10 +877,15 @@ __repmgr_close(env)
int ret;
u_int i;
- db_rep = env->rep_handle;
+ if ((db_rep = env->rep_handle) == NULL)
+ return (0);
ret = 0;
- ret = __repmgr_stop(env);
+ /* Stop repmgr and all of its threads if it was previously started. */
+ if (IS_ENV_REPLICATED(env))
+ ret = __repmgr_stop(env);
+
+ /* Clean up sites array regardless of whether we could stop repmgr. */
if (db_rep->sites != NULL) {
for (i = 0; i < db_rep->site_cnt; i++) {
site = &db_rep->sites[i];
@@ -756,9 +959,9 @@ __repmgr_set_ack_policy(dbenv, policy)
DB_ENV *dbenv;
int policy;
{
+ ENV *env;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
- ENV *env;
REP *rep;
int ret;
@@ -823,6 +1026,208 @@ __repmgr_get_ack_policy(dbenv, policy)
}
/*
+ * PUBLIC: int __repmgr_set_incoming_queue_max __P((DB_ENV *, u_int32_t,
+ * PUBLIC: u_int32_t));
+ *
+ * Sets the maximum amount of dynamic memory used by the Replication Manager
+ * incoming queue.
+ */
+int
+__repmgr_set_incoming_queue_max(dbenv, gbytes, bytes)
+ DB_ENV *dbenv;
+ u_int32_t gbytes, bytes;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ ENV_NOT_CONFIGURED(
+ env, db_rep->region, "DB_ENV->repmgr_set_incoming_queue_max",
+ DB_INIT_REP);
+
+ if (APP_IS_BASEAPI(env)) {
+ __db_errx(env, "%s %s",
+ "DB_ENV->repmgr_set_incoming_queue_max:",
+ "cannot call from base replication application");
+ return (EINVAL);
+ }
+
+ /*
+ * If the caller provided 0 for the size, the size will be unlimited.
+ */
+ if (gbytes == 0 && bytes == 0) {
+ gbytes = UINT32_MAX;
+ bytes = GIGABYTE - 1;
+ }
+
+ while (bytes >= GIGABYTE) {
+ bytes -= GIGABYTE;
+ if (gbytes < UINT32_MAX)
+ gbytes++;
+ }
+
+ if (REP_ON(env)) {
+ ENV_ENTER(env, ip);
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ rep->inqueue_max_gbytes = gbytes;
+ rep->inqueue_max_bytes = bytes;
+ __repmgr_set_incoming_queue_redzone(rep, gbytes, bytes);
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ ENV_LEAVE(env, ip);
+ } else {
+ db_rep->inqueue_max_gbytes = gbytes;
+ db_rep->inqueue_max_bytes = bytes;
+ }
+
+ /*
+ * Setting incoming queue maximum sizes makes this a replication
+ * manager application.
+ */
+ APP_SET_REPMGR(env);
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_get_incoming_queue_max __P((DB_ENV *, u_int32_t *,
+ * PUBLIC: u_int32_t *));
+ *
+ * Gets the maximum amount of dynamic memory that can be used by the
+ * Replicaton Manager incoming queue.
+ */
+int
+__repmgr_get_incoming_queue_max(dbenv, gbytesp, bytesp)
+ DB_ENV *dbenv;
+ u_int32_t *gbytesp, *bytesp;
+{
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ DB_REP *db_rep;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ if (REP_ON(env)) {
+ ENV_ENTER(env, ip);
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ *gbytesp = rep->inqueue_max_gbytes;
+ *bytesp = rep->inqueue_max_bytes;
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ ENV_LEAVE(env, ip);
+ } else {
+ *gbytesp = db_rep->inqueue_max_gbytes;
+ *bytesp = db_rep->inqueue_max_bytes;
+ }
+
+ return (0);
+}
+
+/*
+ * PUBLIC: void __repmgr_set_incoming_queue_redzone __P((void *, u_int32_t,
+ * PUBLIC: u_int32_t));
+ *
+ * Sets the lower bound of the repmgr incoming queue red zone.
+ * !!! Assumes caller holds mtx_repmgr lock.
+ *
+ * Note that we can't simply get the REP* address from the env as we usually do,
+ * because at the time of this call it may not have been linked into there yet.
+ * Also note that, REP is not a public structure, so we use "void *" here.
+ */
+void __repmgr_set_incoming_queue_redzone(rep_, gbytes, bytes)
+ void *rep_;
+ u_int32_t gbytes, bytes;
+{
+ REP *rep;
+ double rdgbytes, rdbytes;
+
+ rep = rep_;
+
+ /*
+ * We use 'double' values to do the computation for precision, and
+ * to avoid overflow.
+ */
+ rdgbytes = gbytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00;
+ rdbytes = (rdgbytes - (u_int32_t)rdgbytes) * GIGABYTE;
+ rdbytes += bytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00;
+ if (rdbytes >= GIGABYTE) {
+ rdgbytes += 1;
+ rdbytes -= GIGABYTE;
+ }
+ rep->inqueue_rz_gbytes = (u_int32_t)rdgbytes;
+ rep->inqueue_rz_bytes = (u_int32_t)rdbytes;
+}
+
+/*
+ * PUBLIC: int __repmgr_get_incoming_queue_redzone __P((DB_ENV *,
+ * PUBLIC: u_int32_t *, u_int32_t *));
+ *
+ * Gets the lower bound of the repmgr incoming queue red zone.
+ * This method must be called after environment open.
+ */
+int __repmgr_get_incoming_queue_redzone(dbenv, gbytesp, bytesp)
+ DB_ENV *dbenv;
+ u_int32_t *gbytesp, *bytesp;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ ENV_REQUIRES_CONFIG(
+ env, db_rep->region, "__repmgr_get_incoming_queue_redzone",
+ DB_INIT_REP);
+
+ ENV_ENTER(env, ip);
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ *gbytesp = rep->inqueue_rz_gbytes;
+ *bytesp = rep->inqueue_rz_bytes;
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ ENV_LEAVE(env, ip);
+
+ return (0);
+}
+
+/*
+ * PUBLIC: int __repmgr_get_incoming_queue_fullevent __P((DB_ENV *,
+ * PUBLIC: int *));
+ *
+ * Return whether the DB_EVENT_REP_INQUEUE_FULL event firing is
+ * turned on or off.
+ * This method must be called after environment open.
+ */
+int __repmgr_get_incoming_queue_fullevent(dbenv, onoffp)
+ DB_ENV *dbenv;
+ int *onoffp;
+{
+ DB_REP *db_rep;
+ ENV *env;
+ REP *rep;
+
+ env = dbenv->env;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ ENV_REQUIRES_CONFIG(
+ env, db_rep->region,
+ "DB_ENV->__repmgr_get_incoming_queue_fullevent",
+ DB_INIT_REP);
+
+ *onoffp = rep->inqueue_full_event_on ? 1 : 0;
+
+ return (0);
+}
+
+/*
* PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *));
*/
int
@@ -837,7 +1242,13 @@ __repmgr_env_create(env, db_rep)
db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
db_rep->config_nsites = 0;
+ ADJUST_AUTOTAKEOVER_WAITS(db_rep, DB_REPMGR_DEFAULT_ACK_TIMEOUT);
db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+ db_rep->inqueue_max_gbytes = 0;
+ db_rep->inqueue_max_bytes = 0;
+#ifdef HAVE_REPLICATION_LISTENER_TAKEOVER
+ FLD_SET(db_rep->config, REP_C_AUTOTAKEOVER);
+#endif
FLD_SET(db_rep->config, REP_C_ELECTIONS);
FLD_SET(db_rep->config, REP_C_2SITE_STRICT);
@@ -846,7 +1257,8 @@ __repmgr_env_create(env, db_rep)
TAILQ_INIT(&db_rep->connections);
TAILQ_INIT(&db_rep->retries);
- db_rep->input_queue.size = 0;
+ db_rep->input_queue.gbytes = 0;
+ db_rep->input_queue.bytes = 0;
STAILQ_INIT(&db_rep->input_queue.header);
__repmgr_env_create_pf(db_rep);
@@ -944,6 +1356,15 @@ __repmgr_await_threads(env)
* of a connector thread.
*/
+ /* Takeover thread. */
+ if (db_rep->takeover_thread != NULL) {
+ if ((t_ret = __repmgr_thread_join(db_rep->takeover_thread)) !=
+ 0 && ret == 0)
+ ret = t_ret;
+ __os_free(env, db_rep->takeover_thread);
+ db_rep->takeover_thread = NULL;
+ }
+
/* Message processing threads. */
for (i = 0;
i < db_rep->nthreads && db_rep->messengers[i] != NULL; i++) {
@@ -1178,7 +1599,7 @@ get_shared_netaddr(env, eid, netaddr)
MUTEX_LOCK(env, rep->mtx_repmgr);
if ((u_int)eid >= rep->site_cnt) {
- ret = DB_NOTFOUND;
+ ret = USR_ERR(env, DB_NOTFOUND);
goto err;
}
DB_ASSERT(env, rep->siteinfo_off != INVALID_ROFF);
@@ -1423,7 +1844,7 @@ send_msg_self(env, iovecs, nmsg)
u_int32_t nmsg;
{
REPMGR_MESSAGE *msg;
- size_t align, bodysize, structsize;
+ size_t align, bodysize, msgsize, structsize;
u_int8_t *membase;
int ret;
@@ -1431,10 +1852,12 @@ send_msg_self(env, iovecs, nmsg)
bodysize = iovecs->total_bytes - __REPMGR_MSG_HDR_SIZE;
structsize = (size_t)DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) +
nmsg * sizeof(DBT)), align);
- if ((ret = __os_malloc(env, structsize + bodysize, &membase)) != 0)
+ msgsize = structsize + bodysize;
+ if ((ret = __os_malloc(env, msgsize, &membase)) != 0)
return (ret);
msg = (void*)membase;
+ msg->size = msgsize;
membase += structsize;
/*
@@ -1616,13 +2039,14 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
}
ENV_ENTER(env, ip);
- ret = get_channel_connection(channel, &conn);
- ENV_LEAVE(env, ip);
- if (ret != 0)
- return (ret);
+ if ((ret = get_channel_connection(channel, &conn)) != 0)
+ goto out;
- if (conn == NULL)
- return (request_self(env, request, nrequest, response, flags));
+ /* If conn is NULL, call request_self and then we are done here. */
+ if (conn == NULL) {
+ ret = request_self(env, request, nrequest, response, flags);
+ goto out;
+ }
/* Find an available array slot, or grow the array if necessary. */
LOCK_MUTEX(db_rep->mutex);
@@ -1670,7 +2094,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
LOCK_MUTEX(db_rep->mutex);
F_CLR(&conn->responses[i], RESP_IN_USE | RESP_THREAD_WAITING);
UNLOCK_MUTEX(db_rep->mutex);
- return (ret);
+ goto out;
}
timeout = timeout > 0 ? timeout : db_channel->timeout;
@@ -1688,7 +2112,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
* to wake up those threads, with a COMPLETE indication and an
* error code. That's more than we want to tackle here.
*/
- return (ret);
+ goto out;
}
/*
@@ -1732,7 +2156,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
sz = conn->iovecs.vectors[0].iov_len;
if ((ret = __os_malloc(env, sz, &dummy)) != 0)
- goto out;
+ goto out_unlck;
__repmgr_iovec_init(&conn->iovecs);
DB_INIT_DBT(resp->dbt, dummy, sz);
__repmgr_add_dbt(&conn->iovecs, &resp->dbt);
@@ -1740,8 +2164,9 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags)
}
}
-out:
+out_unlck:
UNLOCK_MUTEX(db_rep->mutex);
+out: ENV_LEAVE(env, ip);
return (ret);
}
@@ -2168,6 +2593,7 @@ __repmgr_channel_close(dbchan, flags)
{
ENV *env;
DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
REPMGR_CONNECTION *conn;
CHANNEL *channel;
u_int32_t i;
@@ -2182,6 +2608,7 @@ __repmgr_channel_close(dbchan, flags)
* Disable connection(s) (if not already done due to an error having
* occurred previously); release our reference to conn struct(s).
*/
+ ENV_ENTER(env, ip);
LOCK_MUTEX(db_rep->mutex);
if (dbchan->eid >= 0) {
conn = channel->c.conn;
@@ -2218,6 +2645,7 @@ __repmgr_channel_close(dbchan, flags)
__os_free(env, channel);
__os_free(env, dbchan);
+ ENV_LEAVE(env, ip);
return (ret);
}
@@ -2369,29 +2797,26 @@ join_group_at_site(env, addrp)
repmgr_netaddr_t *addrp;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_CONNECTION *conn;
SITE_STRING_BUFFER addr_buf;
repmgr_netaddr_t addr, myaddr;
__repmgr_gm_fwd_args fwd;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
u_int8_t *p, *response_buf, siteinfo_buf[MAX_MSG_BUF];
char host_buf[MAXHOSTNAMELEN + 1], *host;
u_int32_t gen, type;
- size_t len;
+ size_t host_len, msg_len, req_len;
int ret, t_ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
LOCK_MUTEX(db_rep->mutex);
myaddr = SITE_FROM_EID(db_rep->self_eid)->net_addr;
UNLOCK_MUTEX(db_rep->mutex);
- len = strlen(myaddr.host) + 1;
- DB_INIT_DBT(site_info.host, myaddr.host, len);
- site_info.port = myaddr.port;
- site_info.flags = 0;
- ret = __repmgr_site_info_marshal(env,
- &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
- DB_ASSERT(env, ret == 0);
+ host_len = strlen(myaddr.host) + 1;
conn = NULL;
response_buf = NULL;
@@ -2399,14 +2824,35 @@ join_group_at_site(env, addrp)
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "try join request to site %s",
__repmgr_format_addr_loc(addrp, addr_buf)));
retry:
- if ((ret = make_request_conn(env, addrp, &conn)) != 0)
+ if ((ret = __repmgr_make_request_conn(env, addrp, &conn)) != 0)
return (ret);
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
+ if (conn->version < 5) {
+ DB_INIT_DBT(v4site_info.host, myaddr.host, host_len);
+ v4site_info.port = myaddr.port;
+ v4site_info.flags = 0;
+ ret = __repmgr_v4site_info_marshal(env,
+ &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len);
+ } else {
+ DB_INIT_DBT(site_info.host, myaddr.host, host_len);
+ site_info.port = myaddr.port;
+ site_info.status = 0;
+ site_info.flags = 0;
+ if (IS_VIEW_SITE(env))
+ FLD_SET(site_info.flags, SITE_VIEW);
+ if (rep->priority > 0)
+ FLD_SET(site_info.flags, SITE_JOIN_ELECTABLE);
+ ret = __repmgr_site_info_marshal(env,
+ &site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len);
+ }
+ DB_ASSERT(env, ret == 0);
+ /* Preserve separate request length in case there is a retry. */
if ((ret = __repmgr_send_sync_msg(env, conn,
- REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0)
+ REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)req_len)) != 0)
goto err;
- if ((ret = read_own_msg(env,
- conn, &type, &response_buf, &len)) != 0)
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &msg_len)) != 0)
goto err;
if (type == REPMGR_GM_FAILURE) {
@@ -2429,7 +2875,7 @@ retry:
goto err;
ret = __repmgr_gm_fwd_unmarshal(env, &fwd,
- response_buf, len, &p);
+ response_buf, msg_len, &p);
DB_ASSERT(env, ret == 0);
if (fwd.gen > gen) {
if (fwd.host.size > MAXHOSTNAMELEN + 1) {
@@ -2456,7 +2902,8 @@ retry:
}
}
if (type == REPMGR_JOIN_SUCCESS)
- ret = __repmgr_refresh_membership(env, response_buf, len);
+ ret = __repmgr_refresh_membership(env, response_buf, msg_len,
+ conn->version);
else
ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */
@@ -2476,129 +2923,6 @@ err:
}
/*
- * Reads a whole message, when we expect to get a REPMGR_OWN_MSG.
- */
-static int
-read_own_msg(env, conn, typep, bufp, lenp)
- ENV *env;
- REPMGR_CONNECTION *conn;
- u_int32_t *typep;
- u_int8_t **bufp;
- size_t *lenp;
-{
- __repmgr_msg_hdr_args msg_hdr;
- u_int8_t *buf;
- u_int32_t type;
- size_t size;
- int ret;
-
- __repmgr_reset_for_reading(conn);
- if ((ret = __repmgr_read_conn(conn)) != 0)
- goto err;
- ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
- conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
- DB_ASSERT(env, ret == 0);
-
- if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) {
- ret = DB_REP_UNAVAIL; /* Protocol violation. */
- goto err;
- }
- type = REPMGR_OWN_MSG_TYPE(msg_hdr);
- if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) {
- conn->reading_phase = DATA_PHASE;
- __repmgr_iovec_init(&conn->iovecs);
-
- if ((ret = __os_malloc(env, size, &buf)) != 0)
- goto err;
- conn->input.rep_message = NULL;
-
- __repmgr_add_buffer(&conn->iovecs, buf, size);
- if ((ret = __repmgr_read_conn(conn)) != 0) {
- __os_free(env, buf);
- goto err;
- }
- *bufp = buf;
- }
-
- *typep = type;
- *lenp = size;
-
-err:
- return (ret);
-}
-
-static int
-make_request_conn(env, addr, connp)
- ENV *env;
- repmgr_netaddr_t *addr;
- REPMGR_CONNECTION **connp;
-{
- DBT vi;
- __repmgr_msg_hdr_args msg_hdr;
- __repmgr_version_confirmation_args conf;
- REPMGR_CONNECTION *conn;
- int alloc, ret, unused;
-
- alloc = FALSE;
- if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0)
- return (ret);
- conn->type = APP_CONNECTION;
-
- /* Read a handshake msg, to get version confirmation and parameters. */
- if ((ret = __repmgr_read_conn(conn)) != 0)
- goto err;
- /*
- * We can only get here after having read the full 9 bytes that we
- * expect, so this can't fail.
- */
- DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
- ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
- conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
- DB_ASSERT(env, ret == 0);
- __repmgr_iovec_init(&conn->iovecs);
- conn->reading_phase = DATA_PHASE;
-
- if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0)
- goto err;
- alloc = TRUE;
-
- if ((ret = __repmgr_read_conn(conn)) != 0)
- goto err;
-
- /*
- * Analyze the handshake msg, and stash relevant info.
- */
- if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
- goto err;
- DB_ASSERT(env, vi.size > 0);
- if ((ret = __repmgr_version_confirmation_unmarshal(env,
- &conf, vi.data, vi.size, NULL)) != 0)
- goto err;
-
- if (conf.version < GM_MIN_VERSION) {
- ret = DB_REP_UNAVAIL;
- goto err;
- }
- conn->version = conf.version;
-
-err:
- if (alloc) {
- DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0);
- __os_free(env, conn->input.repmgr_msg.cntrl.data);
- DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0);
- __os_free(env, conn->input.repmgr_msg.rec.data);
- }
- __repmgr_reset_for_reading(conn);
- if (ret == 0)
- *connp = conn;
- else {
- (void)__repmgr_close_connection(env, conn);
- (void)__repmgr_destroy_conn(env, conn);
- }
- return (ret);
-}
-
-/*
* PUBLIC: int __repmgr_site __P((DB_ENV *,
* PUBLIC: const char *, u_int, DB_SITE **, u_int32_t));
*/
@@ -2640,9 +2964,9 @@ site_by_addr(env, host, port, sitep)
if ((ret = addr_chk(env, host, port)) != 0)
return (ret);
+ ENV_ENTER(env, ip);
if (REP_ON(env)) {
LOCK_MUTEX(db_rep->mutex);
- ENV_ENTER(env, ip);
locked = TRUE;
} else
locked = FALSE;
@@ -2654,10 +2978,9 @@ site_by_addr(env, host, port, sitep)
* we want the DB_SITE handle to point to; just like site_by_eid() does.
*/
host = site->net_addr.host;
- if (locked) {
- ENV_LEAVE(env, ip);
+ if (locked)
UNLOCK_MUTEX(db_rep->mutex);
- }
+ ENV_LEAVE(env, ip);
if (ret != 0)
return (ret);
@@ -2723,7 +3046,7 @@ init_dbsite(env, eid, host, port, sitep)
dbsite->get_address = __repmgr_get_site_address;
dbsite->get_config = __repmgr_get_config;
dbsite->get_eid = __repmgr_get_eid;
- dbsite->set_config = __repmgr_site_config;
+ dbsite->set_config = __repmgr_site_config_pp;
dbsite->remove = __repmgr_remove_site_pp;
dbsite->close = __repmgr_site_close;
@@ -2756,9 +3079,16 @@ __repmgr_get_eid(dbsite, eidp)
DB_SITE *dbsite;
int *eidp;
{
+ DB_THREAD_INFO *ip;
+ ENV *env;
int ret;
- if ((ret = refresh_site(dbsite)) != 0)
+ env = dbsite->env;
+
+ ENV_ENTER(env, ip);
+ ret = refresh_site(dbsite);
+ ENV_LEAVE(env, ip);
+ if (ret != 0)
return (ret);
if (F_ISSET(dbsite, DB_SITE_PREOPEN)) {
@@ -2791,8 +3121,11 @@ __repmgr_get_config(dbsite, which, valuep)
env = dbsite->env;
db_rep = env->rep_handle;
- if ((ret = refresh_site(dbsite)) != 0)
+ ENV_ENTER(env, ip);
+ if ((ret = refresh_site(dbsite)) != 0) {
+ ENV_LEAVE(env, ip);
return (ret);
+ }
LOCK_MUTEX(db_rep->mutex);
DB_ASSERT(env, IS_VALID_EID(dbsite->eid));
site = SITE_FROM_EID(dbsite->eid);
@@ -2800,32 +3133,52 @@ __repmgr_get_config(dbsite, which, valuep)
rep = db_rep->region;
infop = env->reginfo;
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
sites = R_ADDR(infop, rep->siteinfo_off);
site->config = sites[dbsite->eid].config;
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
}
*valuep = FLD_ISSET(site->config, which) ? 1 : 0;
UNLOCK_MUTEX(db_rep->mutex);
+ ENV_LEAVE(env, ip);
return (0);
}
/*
- * PUBLIC: int __repmgr_site_config __P((DB_SITE *, u_int32_t, u_int32_t));
+ * PUBLIC: int __repmgr_site_config_pp __P((DB_SITE *, u_int32_t, u_int32_t));
*/
int
-__repmgr_site_config(dbsite, which, value)
+__repmgr_site_config_pp(dbsite, which, value)
DB_SITE *dbsite;
u_int32_t which;
u_int32_t value;
{
- DB_REP *db_rep;
DB_THREAD_INFO *ip;
ENV *env;
+ int ret;
+
+ env = dbsite->env;
+
+ ENV_ENTER(env, ip);
+ ret = __repmgr_site_config_int(dbsite, which, value);
+ ENV_LEAVE(env, ip);
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_site_config_int __P((DB_SITE *, u_int32_t, u_int32_t));
+ */
+int
+__repmgr_site_config_int(dbsite, which, value)
+ DB_SITE *dbsite;
+ u_int32_t which;
+ u_int32_t value;
+{
+ DB_REP *db_rep;
+ ENV *env;
REGINFO *infop;
REP *rep;
REPMGR_SITE *site;
@@ -2875,7 +3228,6 @@ __repmgr_site_config(dbsite, which, value)
infop = env->reginfo;
LOCK_MUTEX(db_rep->mutex);
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
sites = R_ADDR(infop, rep->siteinfo_off);
site = SITE_FROM_EID(dbsite->eid);
@@ -2896,7 +3248,6 @@ __repmgr_site_config(dbsite, which, value)
rep->siteinfo_seq++;
}
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
UNLOCK_MUTEX(db_rep->mutex);
} else {
site = SITE_FROM_EID(dbsite->eid);
@@ -2930,7 +3281,6 @@ set_local_site(dbsite, value)
if (REP_ON(env)) {
rep = db_rep->region;
LOCK_MUTEX(db_rep->mutex);
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_repmgr);
locked = TRUE;
/* Make sure we're in sync first. */
@@ -2941,31 +3291,32 @@ set_local_site(dbsite, value)
__db_errx(env, DB_STR("3666",
"A previously given local site may not be unset"));
ret = EINVAL;
- } else if (IS_VALID_EID(db_rep->self_eid) &&
- db_rep->self_eid != dbsite->eid) {
- __db_errx(env, DB_STR("3667",
- "A (different) local site has already been set"));
- ret = EINVAL;
- } else {
- DB_ASSERT(env, IS_VALID_EID(dbsite->eid));
- site = SITE_FROM_EID(dbsite->eid);
- if (FLD_ISSET(site->config,
- DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) {
- __db_errx(env, DB_STR("3668",
- "Local site cannot have HELPER or PEER attributes"));
+ } else if (value) {
+ if (IS_VALID_EID(db_rep->self_eid) &&
+ db_rep->self_eid != dbsite->eid) {
+ __db_errx(env, DB_STR("3697",
+ "A (different) local site has already been set"));
ret = EINVAL;
+ } else {
+ DB_ASSERT(env, IS_VALID_EID(dbsite->eid));
+ site = SITE_FROM_EID(dbsite->eid);
+ if (FLD_ISSET(site->config,
+ DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) {
+ __db_errx(env, DB_STR("3698",
+ "Local site cannot have HELPER or PEER attributes"));
+ ret = EINVAL;
+ }
}
}
- if (ret == 0) {
+ if (ret == 0 && value) {
db_rep->self_eid = dbsite->eid;
if (locked) {
- rep->self_eid = dbsite->eid;
+ rep->self_eid = db_rep->self_eid;
rep->siteinfo_seq++;
}
}
if (locked) {
MUTEX_UNLOCK(env, rep->mtx_repmgr);
- ENV_LEAVE(env, ip);
UNLOCK_MUTEX(db_rep->mutex);
}
return (ret);
@@ -2998,7 +3349,7 @@ refresh_site(dbsite)
}
static int
-__repmgr_remove_site_pp(dbsite)
+__repmgr_remove_and_close_site(dbsite)
DB_SITE *dbsite;
{
int ret, t_ret;
@@ -3011,6 +3362,23 @@ __repmgr_remove_site_pp(dbsite)
*/
if ((t_ret = __repmgr_site_close(dbsite)) != 0 && ret == 0)
ret = t_ret;
+
+ return (ret);
+}
+
+static int
+__repmgr_remove_site_pp(dbsite)
+ DB_SITE *dbsite;
+{
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ int ret;
+
+ env = dbsite->env;
+
+ ENV_ENTER(env, ip);
+ ret = __repmgr_remove_and_close_site(dbsite);
+ ENV_LEAVE(env, ip);
return (ret);
}
@@ -3024,6 +3392,7 @@ __repmgr_remove_site(dbsite)
REPMGR_CONNECTION *conn;
repmgr_netaddr_t addr;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
u_int8_t *response_buf, siteinfo_buf[MAX_MSG_BUF];
size_t len;
u_int32_t type;
@@ -3046,23 +3415,33 @@ __repmgr_remove_site(dbsite)
DB_ASSERT(env, IS_VALID_EID(master));
addr = SITE_FROM_EID(master)->net_addr;
UNLOCK_MUTEX(db_rep->mutex);
-
len = strlen(dbsite->host) + 1;
- DB_INIT_DBT(site_info.host, dbsite->host, len);
- site_info.port = dbsite->port;
- site_info.flags = 0;
- ret = __repmgr_site_info_marshal(env,
- &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
- DB_ASSERT(env, ret == 0);
conn = NULL;
response_buf = NULL;
- if ((ret = make_request_conn(env, &addr, &conn)) != 0)
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
return (ret);
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
+ if (conn->version < 5) {
+ DB_INIT_DBT(v4site_info.host, dbsite->host, len);
+ v4site_info.port = dbsite->port;
+ v4site_info.flags = 0;
+ ret = __repmgr_v4site_info_marshal(env,
+ &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
+ } else {
+ DB_INIT_DBT(site_info.host, dbsite->host, len);
+ site_info.port = dbsite->port;
+ site_info.status = 0;
+ site_info.flags = 0;
+ ret = __repmgr_site_info_marshal(env,
+ &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len);
+ }
+ DB_ASSERT(env, ret == 0);
+
if ((ret = __repmgr_send_sync_msg(env, conn,
REPMGR_REMOVE_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0)
goto err;
- if ((ret = read_own_msg(env,
+ if ((ret = __repmgr_read_own_msg(env,
conn, &type, &response_buf, &len)) != 0)
goto err;
ret = type == REPMGR_REMOVE_SUCCESS ? 0 : DB_REP_UNAVAIL;
@@ -3090,3 +3469,82 @@ __repmgr_site_close(dbsite)
__os_free(dbsite->env, dbsite);
return (0);
}
+
+/*
+ * Demotes a participant site to a view. This is a one-way and one-time
+ * operation.
+ *
+ * The demotion occurs at the very end of repmgr_start() because it
+ * requires a select thread to perform the gmdb operations that remove
+ * the site from the replication group and immediately add the site back
+ * into the group as a view. The demotion also preserves any other threads
+ * created by repmgr_start() so that they are there to be used by the
+ * demoted site after it is re-added as a view site.
+ *
+ * We remove and re-add the site to propagate the site's change from
+ * participant to view to all sites in the replication group. This includes
+ * updates to each site's gmdb and in-memory site list.
+ */
+#define REPMGR_DEMOTION_MASTER_RETRIES 10
+#define REPMGR_DEMOTION_RETRY_USECS 500000
+static int
+__repmgr_demote_site(env, eid)
+ ENV *env;
+ int eid;
+{
+ DB_REP *db_rep;
+ DB_SITE *dbsite;
+ REP *rep;
+ REPMGR_SITE *site;
+ int ret, t_ret, tries;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ site = SITE_FROM_EID(eid);
+ dbsite = NULL;
+
+ /* Inform other repmgr threads that a demotion is in progress. */
+ db_rep->demotion_pending = TRUE;
+
+ if ((ret = init_dbsite(env, eid, site->net_addr.host,
+ site->net_addr.port, &dbsite)) != 0)
+ goto err;
+
+ /*
+ * We need a master to perform the gmdb updates. Poll periodically
+ * for a limited time to find one.
+ */
+ tries = 0;
+ while (rep->master_id == DB_EID_INVALID) {
+ __os_yield(env, 0, REPMGR_DEMOTION_RETRY_USECS);
+ if (++tries >= REPMGR_DEMOTION_MASTER_RETRIES) {
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ }
+ }
+
+ /* Remove site from replication group. */
+ if ((ret = __repmgr_remove_site(dbsite)) != 0)
+ goto err;
+
+ /*
+ * Add site back into replication group as a view. This demotion is
+ * occurring because this site now has a view callback but its
+ * SITE_VIEW flag is not set. Now, __repmgr_join_group() will detect
+ * the view callback and set the SITE_VIEW flag before sending this
+ * site's information to the rest of the replication group.
+ */
+ if ((ret = __repmgr_join_group(env)) != 0)
+ goto err;
+
+err:
+ /* Deallocates dbsite. */
+ if (dbsite != NULL) {
+ t_ret = __repmgr_site_close(dbsite);
+ if (ret == 0 && t_ret != 0)
+ ret = t_ret;
+ }
+ /* Must reset demotion_pending before leaving this routine. */
+ db_rep->demotion_pending = FALSE;
+ return (ret);
+}
diff --git a/src/repmgr/repmgr_msg.c b/src/repmgr/repmgr_msg.c
index 13537823..71cb2ada 100644
--- a/src/repmgr/repmgr_msg.c
+++ b/src/repmgr/repmgr_msg.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -15,15 +15,19 @@
#include "dbinc_auto/repmgr_auto.h"
static int dispatch_app_message __P((ENV *, REPMGR_MESSAGE *));
-static int finish_gmdb_update __P((ENV *,
- DB_THREAD_INFO *, DBT *, u_int32_t, u_int32_t, __repmgr_member_args *));
+static int finish_gmdb_update __P((ENV *, DB_THREAD_INFO *,
+ DBT *, u_int32_t, u_int32_t, u_int32_t, __repmgr_member_args *));
static int incr_gm_version __P((ENV *, DB_THREAD_INFO *, DB_TXN *));
-static void marshal_site_data __P((ENV *, u_int32_t, u_int8_t *, DBT *));
+static void marshal_site_data __P((ENV *,
+ u_int32_t, u_int32_t, u_int8_t *, DBT *));
static void marshal_site_key __P((ENV *,
repmgr_netaddr_t *, u_int8_t *, DBT *, __repmgr_member_args *));
static int message_loop __P((ENV *, REPMGR_RUNNABLE *));
+static int preferred_master_takeover __P((ENV*));
static int process_message __P((ENV*, DBT*, DBT*, int));
static int reject_fwd __P((ENV *, REPMGR_CONNECTION *));
+static int rejoin_connections(ENV *);
+static int rejoin_deferred_election(ENV *);
static int rescind_pending __P((ENV *,
DB_THREAD_INFO *, int, u_int32_t, u_int32_t));
static int resolve_limbo_int __P((ENV *, DB_THREAD_INFO *));
@@ -33,9 +37,13 @@ static int send_permlsn_conn __P((ENV *,
REPMGR_CONNECTION *, u_int32_t, DB_LSN *));
static int serve_join_request __P((ENV *,
DB_THREAD_INFO *, REPMGR_MESSAGE *));
+static int serve_lsnhist_request __P((ENV *, DB_THREAD_INFO *,
+ REPMGR_MESSAGE *));
+static int serve_readonly_master_request __P((ENV *, REPMGR_MESSAGE *));
static int serve_remove_request __P((ENV *,
DB_THREAD_INFO *, REPMGR_MESSAGE *));
static int serve_repmgr_request __P((ENV *, REPMGR_MESSAGE *));
+static int serve_restart_client_request __P((ENV *, REPMGR_MESSAGE *));
/*
* Map one of the phase-1/provisional membership status values to its
@@ -72,6 +80,7 @@ message_loop(env, th)
REPMGR_RUNNABLE *th;
{
DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
REP *rep;
REPMGR_MESSAGE *msg;
REPMGR_CONNECTION *conn;
@@ -83,6 +92,7 @@ message_loop(env, th)
COMPQUIET(membership, 0);
db_rep = env->rep_handle;
rep = db_rep->region;
+ ENV_ENTER(env, ip);
LOCK_MUTEX(db_rep->mutex);
while ((ret = __repmgr_queue_get(env, &msg, th)) == 0) {
incremented = FALSE;
@@ -141,7 +151,21 @@ message_loop(env, th)
* detect it without the need for application
* activity.
*/
- ret = __rep_flush(env->dbenv);
+ ret = __rep_flush_int(env);
+ } else if (db_rep->prefmas_pending == master_switch &&
+ IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER) &&
+ F_ISSET(rep, REP_F_CLIENT)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+"message_loop heartbeat preferred master switch"));
+ /*
+ * We are a preferred master site currently
+ * running as a client and we have finished
+ * syncing with the temporary master. It is
+ * now time to take over as master.
+ */
+ db_rep->prefmas_pending = no_action;
+ ret = preferred_master_takeover(env);
} else {
/*
* Use heartbeat message to initiate rerequest
@@ -162,6 +186,12 @@ message_loop(env, th)
db_rep->non_rep_th--;
if (ret != 0)
goto out;
+ if (db_rep->view_mismatch) {
+ __db_errx(env, DB_STR("3699",
+ "Site is not recorded as a view in the group membership database"));
+ ret = EINVAL;
+ goto out;
+ }
}
/*
* A return of DB_REP_UNAVAIL from __repmgr_queue_get() merely means we
@@ -171,6 +201,7 @@ message_loop(env, th)
ret = 0;
out:
UNLOCK_MUTEX(db_rep->mutex);
+ ENV_LEAVE(env, ip);
return (ret);
}
@@ -341,16 +372,45 @@ process_message(env, control, rec, eid)
break;
case DB_REP_DUPMASTER:
- /*
- * Initiate an election if we're configured to be using
- * elections, but only if we're *NOT* using leases. When using
- * leases, there is never any uncertainty over which site is the
- * rightful master, and only the loser gets the DUPMASTER return
- * code.
- */
- if ((ret = __repmgr_become_client(env)) == 0 &&
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) {
+ /*
+ * The preferred master site must restart as a master
+ * so that it sends out a NEWMASTER to help the client
+ * sync. It must force a role change so that it
+ * advances its gen even though it is already master.
+ * This is needed if there was a temporary master at
+ * a higher gen that is now restarting as a client.
+ * A client won't process messages from a master at
+ * a lower gen than its own.
+ */
+ ret = __repmgr_repstart(env, DB_REP_MASTER,
+ REP_START_FORCE_ROLECHG);
+ } else if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) &&
+ (ret = __repmgr_become_client(env)) == 0) {
+ /*
+ * The preferred master client site must restart as
+ * client without any elections to enable the preferred
+ * master site to preserve its own transactions. It
+ * uses an election thread to repeatedly perform client
+ * startups so that it will perform its client sync
+ * when the preferred master's gen has caught up.
+ */
+ LOCK_MUTEX(db_rep->mutex);
+ ret = __repmgr_init_election(env,
+ ELECT_F_CLIENT_RESTART);
+ UNLOCK_MUTEX(db_rep->mutex);
+ } else if ((ret = __repmgr_become_client(env)) == 0 &&
FLD_ISSET(rep->config, REP_C_LEASE | REP_C_ELECTIONS)
== REP_C_ELECTIONS) {
+ /*
+ * Initiate an election if we're configured to be using
+ * elections, but only if we're *NOT* using leases.
+ * When using leases, there is never any uncertainty
+ * over which site is the rightful master, and only the
+ * loser gets the DUPMASTER return code.
+ */
LOCK_MUTEX(db_rep->mutex);
ret = __repmgr_init_election(env, ELECT_F_IMMED);
UNLOCK_MUTEX(db_rep->mutex);
@@ -406,6 +466,14 @@ DB_TEST_RECOVERY_LABEL
t_ret = __op_rep_exit(env);
if (ret == ENOENT)
ret = 0;
+ else if (ret == DB_DELETED && db_rep->demotion_pending)
+ /*
+ * If a demotion is in progress, we want to keep
+ * the repmgr threads instead of bowing out because
+ * they are needed when we rejoin the replication group
+ * immediately as a view.
+ */
+ ret = 0;
else if (ret == DB_DELETED)
ret = __repmgr_bow_out(env);
if (t_ret != 0 && ret == 0)
@@ -428,8 +496,10 @@ __repmgr_handle_event(env, event, info)
void *info;
{
DB_REP *db_rep;
+ REP *rep;
db_rep = env->rep_handle;
+ rep = db_rep->region;
if (db_rep->selector == NULL) {
/* Repmgr is not in use, so all events go to application. */
@@ -457,9 +527,46 @@ __repmgr_handle_event(env, event, info)
/* Application still needs to see this. */
break;
+ case DB_EVENT_REP_MASTER:
+ case DB_EVENT_REP_STARTUPDONE:
+ /*
+ * Detect a rare case where a dupmaster or incomplete gmdb
+ * operation has left the site's gmdb inconsistent with
+ * a view callback definition. The user would have correctly
+ * defined a view callback and called repmgr_start(), but the
+ * gmdb operation to update this site to a view would have been
+ * incomplete or rolled back. The site cannot operate in this
+ * inconsistent state, so set an indicator to cause a message
+ * thread to panic and terminate.
+ *
+ * The one exception is during a demotion to view, when
+ * this inconsistency is expected for a short time.
+ */
+ if (IS_VALID_EID(db_rep->self_eid) &&
+ PARTICIPANT_TO_VIEW(db_rep,
+ SITE_FROM_EID(db_rep->self_eid)) &&
+ !db_rep->demotion_pending)
+ db_rep->view_mismatch = TRUE;
+
+ /*
+ * In preferred master mode, when the preferred master site
+ * finishes synchronizing with the temporary master it must
+ * prepare to take over as master. This is detected by the
+ * next heartbeat in a message thread, where the takeover is
+ * actually performed.
+ */
+ if (event == DB_EVENT_REP_STARTUPDONE &&
+ IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "startupdone set preferred master switch"));
+ db_rep->prefmas_pending = master_switch;
+ }
+ break;
default:
break;
}
+ COMPQUIET(info, NULL);
return (DB_EVENT_NOT_HANDLED);
}
@@ -504,7 +611,7 @@ send_permlsn(env, generation, lsn)
*/
policy = site->ack_policy > 0 ?
site->ack_policy : rep->perm_policy;
- if (policy == DB_REPMGR_ACKS_NONE ||
+ if (IS_VIEW_SITE(env) || policy == DB_REPMGR_ACKS_NONE ||
(IS_PEER_POLICY(policy) && rep->priority == 0))
ack = FALSE;
else
@@ -614,26 +721,149 @@ send_permlsn_conn(env, conn, generation, lsn)
return (ret);
}
+/*
+ * Perform the steps on the preferred master site to take over again as
+ * preferred master from a temporary master. This routine should only be
+ * called after the preferred master has restarted as a client and finished
+ * a client sync with the temporary master.
+ *
+ * This routine makes a best effort to wait until all temporary master
+ * transactions have been applied on this site before taking over.
+ */
+static int
+preferred_master_takeover(env)
+ ENV *env;
+{
+ DB_LOG *dblp;
+ DB_REP *db_rep;
+ LOG *lp;
+ REP *rep;
+ DB_LSN last_ready_lsn, ready_lsn, sync_lsn;
+ u_long usec;
+ u_int32_t gen, max_tries, tries;
+ int ret, synced;
+
+ dblp = env->lg_handle;
+ lp = dblp->reginfo.primary;
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ gen = 0;
+ ZERO_LSN(sync_lsn);
+ ret = 0;
+
+ if (!IS_PREFMAS_MODE(env))
+ return (ret);
+
+ /*
+ * Start by making the temporary master a readonly master so that we
+ * can know when we have applied all of its transactions on this
+ * site before taking over.
+ */
+ if ((ret = __repmgr_make_site_readonly_master(env,
+ 1, &gen, &sync_lsn)) != 0)
+ return (ret);
+ DB_ASSERT(env, gen >= rep->gen);
+
+ /*
+ * Make a best effort to wait until this site has all transactions
+ * from the temporary master. We want to preserve temporary master
+ * transactions, but we can't wait forever. If we exceed our wait,
+ * we restart this site as preferred master anyway. This may
+ * sacrifice some temporary master transactions in order to preserve
+ * repgroup write availability.
+ *
+ * We restart the number of tries each time we make progress in
+ * transactions applied, until either we apply through sync_lsn or
+ * we exceed max_tries without progress.
+ */
+ if ((ret = __repmgr_prefmas_get_wait(env, &max_tries, &usec)) != 0)
+ return (ret);
+ tries = 0;
+ synced = 0;
+ ZERO_LSN(ready_lsn);
+ ZERO_LSN(last_ready_lsn);
+ while (!synced && tries < max_tries) {
+ __os_yield(env, 0, usec);
+ tries++;
+ /*
+ * lp->ready_lsn is the next LSN we expect to receive,
+ * which also indicates how much we've applied. sync_lsn
+ * is the lp->lsn (indicating the next log record expected)
+ * from the other site.
+ */
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ ready_lsn = lp->ready_lsn;
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ if (gen == rep->gen && LOG_COMPARE(&ready_lsn, &sync_lsn) >= 0)
+ synced = 1;
+ else if (LOG_COMPARE(&ready_lsn, &last_ready_lsn) >= 0) {
+ /* We are making progress, restart number of tries. */
+ last_ready_lsn = ready_lsn;
+ tries = 0;
+ }
+ }
+
+ /* Restart the remote readonly temporary master as a client. */
+ if ((ret = __repmgr_restart_site_as_client(env, 1)) != 0)
+ return (ret);
+
+ /* Restart this site as the preferred master, waiting for
+ * REP_LOCKOUT_MSG. The NEWCLIENT message sent back from
+ * restarting the other site as client can briefly lock
+ * REP_LOCKOUT_MSG to do some cleanup. We don't want this
+ * to cause the rep_start_int() call to restart this site
+ * as master to return 0 without doing anything.
+ */
+ ret = __repmgr_become_master(env, REP_START_WAIT_LOCKMSG);
+ return (ret);
+}
+
static int
serve_repmgr_request(env, msg)
ENV *env;
REPMGR_MESSAGE *msg;
{
- DB_THREAD_INFO *ip;
+ DB_REP *db_rep;
DBT *dbt;
+ DB_THREAD_INFO *ip;
REPMGR_CONNECTION *conn;
+ u_int32_t mtype;
int ret, t_ret;
- ENV_ENTER(env, ip);
- switch (REPMGR_OWN_MSG_TYPE(msg->msg_hdr)) {
+ db_rep = env->rep_handle;
+ ENV_GET_THREAD_INFO(env, ip);
+ conn = msg->v.gmdb_msg.conn;
+ mtype = REPMGR_OWN_MSG_TYPE(msg->msg_hdr);
+ switch (mtype) {
case REPMGR_JOIN_REQUEST:
ret = serve_join_request(env, ip, msg);
break;
+ case REPMGR_LSNHIST_REQUEST:
+ ret = serve_lsnhist_request(env, ip, msg);
+ break;
+ case REPMGR_READONLY_MASTER:
+ ret = serve_readonly_master_request(env, msg);
+ break;
case REPMGR_REJOIN:
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"One try at rejoining group automatically"));
if ((ret = __repmgr_join_group(env)) == DB_REP_UNAVAIL)
ret = __repmgr_bow_out(env);
+ else if (ret == 0 && IS_PREFMAS_MODE(env)) {
+ /*
+ * For preferred master mode, we need to get
+ * a "regular" connection to the other site without
+ * calling an election prematurely here.
+ */
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Establishing connections after rejoin"));
+ ret = rejoin_connections(env);
+ } else if (ret == 0 && db_rep->rejoin_pending) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Calling deferred election after rejoin"));
+ ret = rejoin_deferred_election(env);
+ }
+ db_rep->rejoin_pending = FALSE;
break;
case REPMGR_REMOVE_REQUEST:
ret = serve_remove_request(env, ip, msg);
@@ -641,23 +871,32 @@ serve_repmgr_request(env, msg)
case REPMGR_RESOLVE_LIMBO:
ret = resolve_limbo_wrapper(env, ip);
break;
+ case REPMGR_RESTART_CLIENT:
+ ret = serve_restart_client_request(env, msg);
+ break;
case REPMGR_SHARING:
dbt = &msg->v.gmdb_msg.request;
- ret = __repmgr_refresh_membership(env, dbt->data, dbt->size);
+ ret = __repmgr_refresh_membership(env, dbt->data, dbt->size,
+ (conn == NULL ? DB_REPMGR_VERSION : conn->version));
break;
default:
ret = __db_unknown_path(env, "serve_repmgr_request");
break;
}
- if ((conn = msg->v.gmdb_msg.conn) != NULL) {
+ if (conn != NULL) {
+ /*
+ * A site that removed itself may have already closed its
+ * connections. Do not return an error and panic if we
+ * can't close the one-shot GMDB connection for a remove
+ * request here.
+ */
if ((t_ret = __repmgr_close_connection(env, conn)) != 0 &&
- ret == 0)
+ ret == 0 && mtype != REPMGR_REMOVE_REQUEST)
ret = t_ret;
if ((t_ret = __repmgr_decr_conn_ref(env, conn)) != 0 &&
ret == 0)
ret = t_ret;
}
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -674,8 +913,10 @@ serve_join_request(env, ip, msg)
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
+ REPMGR_SITE *site;
DBT *dbt;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
u_int8_t *buf;
char *host;
size_t len;
@@ -686,9 +927,18 @@ serve_join_request(env, ip, msg)
COMPQUIET(status, 0);
conn = msg->v.gmdb_msg.conn;
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
dbt = &msg->v.gmdb_msg.request;
- ret = __repmgr_site_info_unmarshal(env,
- &site_info, dbt->data, dbt->size, NULL);
+ if (conn->version < 5) {
+ ret = __repmgr_v4site_info_unmarshal(env,
+ &v4site_info, dbt->data, dbt->size, NULL);
+ site_info.host = v4site_info.host;
+ site_info.port = v4site_info.port;
+ site_info.status = v4site_info.flags;
+ site_info.flags = 0;
+ } else
+ ret = __repmgr_site_info_unmarshal(env,
+ &site_info, dbt->data, dbt->size, NULL);
host = site_info.host.data;
host[site_info.host.size - 1] = '\0';
@@ -703,7 +953,23 @@ serve_join_request(env, ip, msg)
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_find_site(env, host, site_info.port, &eid)) == 0) {
DB_ASSERT(env, eid != db_rep->self_eid);
- status = SITE_FROM_EID(eid)->membership;
+ site = SITE_FROM_EID(eid);
+ status = site->membership;
+ /*
+ * Remote site electability is usually exchanged when
+ * a connection is established, but when a new site
+ * joins the repgroup there is a brief gap between the
+ * join and the connection. Record electability for
+ * the joining site so that we are not overly conservative
+ * about the number of acks we require for a PERM
+ * transaction if the joining site is unelectable.
+ */
+ if (FLD_ISSET(site_info.flags, SITE_JOIN_ELECTABLE)) {
+ F_SET(site, SITE_ELECTABLE);
+ FLD_CLR(site_info.flags, SITE_JOIN_ELECTABLE);
+ } else
+ F_CLR(site, SITE_ELECTABLE);
+ F_SET(site, SITE_HAS_PRIO);
}
UNLOCK_MUTEX(db_rep->mutex);
if (ret != 0)
@@ -712,7 +978,8 @@ serve_join_request(env, ip, msg)
switch (status) {
case 0:
case SITE_ADDING:
- ret = __repmgr_update_membership(env, ip, eid, SITE_ADDING);
+ ret = __repmgr_update_membership(env, ip, eid, SITE_ADDING,
+ site_info.flags);
break;
case SITE_PRESENT:
/* Already in desired state. */
@@ -729,7 +996,7 @@ serve_join_request(env, ip, msg)
goto err;
LOCK_MUTEX(db_rep->mutex);
- ret = __repmgr_marshal_member_list(env, &buf, &len);
+ ret = __repmgr_marshal_member_list(env, conn->version, &buf, &len);
UNLOCK_MUTEX(db_rep->mutex);
if (ret != 0)
goto err;
@@ -760,6 +1027,7 @@ serve_remove_request(env, ip, msg)
REPMGR_SITE *site;
DBT *dbt;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
char *host;
u_int32_t status, type;
int eid, ret, t_ret;
@@ -768,9 +1036,18 @@ serve_remove_request(env, ip, msg)
db_rep = env->rep_handle;
conn = msg->v.gmdb_msg.conn;
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
dbt = &msg->v.gmdb_msg.request;
- ret = __repmgr_site_info_unmarshal(env,
- &site_info, dbt->data, dbt->size, NULL);
+ if (conn->version < 5) {
+ ret = __repmgr_v4site_info_unmarshal(env,
+ &v4site_info, dbt->data, dbt->size, NULL);
+ site_info.host = v4site_info.host;
+ site_info.port = v4site_info.port;
+ site_info.status = v4site_info.flags;
+ site_info.flags = 0;
+ } else
+ ret = __repmgr_site_info_unmarshal(env,
+ &site_info, dbt->data, dbt->size, NULL);
host = site_info.host.data;
host[site_info.host.size - 1] = '\0';
@@ -810,7 +1087,8 @@ serve_remove_request(env, ip, msg)
break;
case SITE_PRESENT:
case SITE_DELETING:
- ret = __repmgr_update_membership(env, ip, eid, SITE_DELETING);
+ ret = __repmgr_update_membership(env, ip, eid, SITE_DELETING,
+ site_info.flags);
break;
default:
ret = __db_unknown_path(env, "serve_remove_request");
@@ -829,7 +1107,175 @@ err:
default:
return (ret);
}
- return (__repmgr_send_sync_msg(env, conn, type, NULL, 0));
+ /*
+ * It is possible when a site removes itself that by now it has
+ * already acted on the first GMDB update and closed its connections.
+ * Do not return an error and panic if we can't send the final
+ * status of the remove operation.
+ */
+ if ((ret = __repmgr_send_sync_msg(env, conn, type, NULL, 0)) != 0)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Problem sending remove site status message %d", ret));
+ return (0);
+}
+
+/*
+ * Serve the REPMGR_RESTART_CLIENT message by restarting this site as a
+ * client if it is not already a client. Always sends back a
+ * REPMGR_PREFMAS_SUCCESS message with an empty payload.
+ */
+static int
+serve_restart_client_request(env, msg)
+ ENV *env;
+ REPMGR_MESSAGE *msg;
+{
+ DB_REP *db_rep;
+ REP * rep;
+ REPMGR_CONNECTION *conn;
+ int ret, t_ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
+
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Serving restart_client request"));
+ conn = msg->v.gmdb_msg.conn;
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
+ /* No need to read payload - it is just a dummy byte. */
+
+ if (IS_PREFMAS_MODE(env) && !F_ISSET(rep, REP_F_CLIENT))
+ ret = __repmgr_become_client(env);
+
+ if ((t_ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_PREFMAS_SUCCESS, NULL, 0)) != 0)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Problem sending restart client success message %d", ret));
+
+ if (ret == 0 && t_ret != 0)
+ ret = t_ret;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Request for restart_client returning %d", ret));
+ return (ret);
+}
+
+/*
+ * Serve the REPMGR_READONLY_MASTER message by turning this site into a
+ * readonly master. Always sends back a REPMGR_READONLY_RESPONSE message with
+ * a payload containing this site's gen and next LSN expected. If there are
+ * any errors, the gen is 0 and the next LSN is [0,0].
+ */
+static int
+serve_readonly_master_request(env, msg)
+ ENV *env;
+ REPMGR_MESSAGE *msg;
+{
+ REPMGR_CONNECTION *conn;
+ __repmgr_permlsn_args permlsn;
+ u_int8_t buf[__REPMGR_PERMLSN_SIZE];
+ int ret, t_ret;
+
+ ret = 0;
+
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Serving readonly_master request"));
+ conn = msg->v.gmdb_msg.conn;
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
+ /* No need to read payload - it is just a dummy byte. */
+
+ if (IS_PREFMAS_MODE(env))
+ ret = __rep_become_readonly_master(env,
+ &permlsn.generation, &permlsn.lsn);
+
+ __repmgr_permlsn_marshal(env, &permlsn, buf);
+ if ((t_ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_READONLY_RESPONSE, buf, __REPMGR_PERMLSN_SIZE)) != 0)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Problem sending readonly response message %d", ret));
+ if (ret == 0 && t_ret != 0)
+ ret = t_ret;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Request for readonly_master returning %d", ret));
+ return (ret);
+}
+
+/*
+ * Serve the REPMGR_LSNHIST_REQUEST message by retrieving information from
+ * this site's LSN history database for the requested gen. If the requested
+ * gen exists at this site, sends back a REPMGR_LSNHIST_RESPONSE message
+ * containing the LSN and timestamp at the requested gen and the LSN for the
+ * next gen if that gen exists (next gen LSN is [0,0] if next gen doesn't
+ * yet exist at this site.) Sends back a PREFMAS_FAILURE message if the
+ * requested gen does not yet exist at this site or if there are any errors.
+ */
+static int
+serve_lsnhist_request(env, ip, msg)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ REPMGR_MESSAGE *msg;
+{
+ REPMGR_CONNECTION *conn;
+ DBT *dbt;
+ __repmgr_lsnhist_match_args lsnhist_match;
+ __rep_lsn_hist_data_args lsnhist_data, next_lsnhist_data;
+ __rep_lsn_hist_key_args key;
+ u_int8_t match_buf[__REPMGR_LSNHIST_MATCH_SIZE];
+ DB_LSN next_gen_lsn;
+ int ret, t_ret;
+
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC, "Serving lsnhist request"));
+ conn = msg->v.gmdb_msg.conn;
+ DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION);
+ /* Read lsn_hist_key incoming payload to get gen being requested. */
+ dbt = &msg->v.gmdb_msg.request;
+ if ((ret = __rep_lsn_hist_key_unmarshal(env,
+ &key, dbt->data, dbt->size, NULL)) != 0)
+ return (ret);
+ if (key.version != REP_LSN_HISTORY_FMT_VERSION) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "serve_lsnhist_request version mismatch"));
+ return (0);
+ }
+
+ /*
+ * There's no need to retry if we don't find an lsnhist record for
+ * requested gen. This site is either a temporary master or a client,
+ * which means that if it doesn't already have an lsnhist record at
+ * this gen, it is highly unlikely to get one in the near future.
+ */
+ if ((ret = __rep_get_lsnhist_data(env,
+ ip, key.gen, &lsnhist_data)) == 0) {
+
+ if ((t_ret = __rep_get_lsnhist_data(env,
+ ip, key.gen + 1, &next_lsnhist_data)) == 0)
+ next_gen_lsn = next_lsnhist_data.lsn;
+ else
+ ZERO_LSN(next_gen_lsn);
+
+ lsnhist_match.lsn = lsnhist_data.lsn;
+ lsnhist_match.hist_sec = lsnhist_data.hist_sec;
+ lsnhist_match.hist_nsec = lsnhist_data.hist_nsec;
+ lsnhist_match.next_gen_lsn = next_gen_lsn;
+ __repmgr_lsnhist_match_marshal(env, &lsnhist_match, match_buf);
+ if ((t_ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_LSNHIST_RESPONSE, match_buf,
+ __REPMGR_LSNHIST_MATCH_SIZE)) != 0)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Problem sending lsnhist response message %d",
+ ret));
+ } else if ((t_ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_PREFMAS_FAILURE, NULL, 0)) != 0)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Problem sending prefmas failure message %d", ret));
+
+ /* Do not return an error if LSN history record not found. */
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+ if (ret == 0 && t_ret != 0)
+ ret = t_ret;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Request for lsnhist returning %d", ret));
+ return (ret);
}
/*
@@ -917,7 +1363,13 @@ resolve_limbo_int(env, ip)
if (orig_status == SITE_PRESENT || orig_status == 0)
goto out;
- if (IS_ZERO_LSN(db_rep->limbo_failure))
+ /*
+ * It is possible after an autotakeover on a master to have no
+ * limbo_failure LSN but to have a limbo_victim that was found
+ * in the gmdb that still needs to be resolved.
+ */
+ if (IS_ZERO_LSN(db_rep->limbo_failure) &&
+ !db_rep->limbo_resolution_needed)
goto out;
/*
@@ -947,7 +1399,8 @@ resolve_limbo_int(env, ip)
ip, NULL, &txn, DB_IGNORE_LEASE)) != 0)
goto out;
- marshal_site_data(env, orig_status, data_buf, &data_dbt);
+ marshal_site_data(env,
+ orig_status, site->gmdb_flags, data_buf, &data_dbt);
ret = __db_put(db_rep->gmdb, ip, txn, &key_dbt, &data_dbt, 0);
if ((t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 &&
@@ -980,15 +1433,15 @@ resolve_limbo_int(env, ip)
UNLOCK_MUTEX(db_rep->mutex);
locked = FALSE;
status = NEXT_STATUS(orig_status);
- if ((ret = finish_gmdb_update(env,
- ip, &key_dbt, orig_status, status, &logrec)) != 0)
+ if ((ret = finish_gmdb_update(env, ip,
+ &key_dbt, orig_status, status, site->gmdb_flags, &logrec)) != 0)
goto out;
/* Track modified membership status in our in-memory sites array. */
LOCK_MUTEX(db_rep->mutex);
locked = TRUE;
if ((ret = __repmgr_set_membership(env,
- addr.host, addr.port, status)) != 0)
+ addr.host, addr.port, status, site->gmdb_flags)) != 0)
goto out;
__repmgr_set_sites(env);
@@ -1005,14 +1458,15 @@ out:
* status is inferred (ADDING -> PRESENT, or DELETING -> 0).
*
* PUBLIC: int __repmgr_update_membership __P((ENV *,
- * PUBLIC: DB_THREAD_INFO *, int, u_int32_t));
+ * PUBLIC: DB_THREAD_INFO *, int, u_int32_t, u_int32_t));
*/
int
-__repmgr_update_membership(env, ip, eid, pstatus)
+__repmgr_update_membership(env, ip, eid, pstatus, site_flags)
ENV *env;
DB_THREAD_INFO *ip;
int eid;
u_int32_t pstatus; /* Provisional status. */
+ u_int32_t site_flags;
{
DB_REP *db_rep;
REPMGR_SITE *site;
@@ -1092,7 +1546,7 @@ retry:
* those seem even more confusing.
*/
if ((ret = __repmgr_set_membership(env,
- addr.host, addr.port, pstatus)) != 0)
+ addr.host, addr.port, pstatus, site_flags)) != 0)
goto err;
__repmgr_set_sites(env);
@@ -1108,7 +1562,7 @@ retry:
if ((ret = __txn_begin(env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0)
goto err;
marshal_site_key(env, &addr, key_buf, &key_dbt, &logrec);
- marshal_site_data(env, pstatus, status_buf, &data_dbt);
+ marshal_site_data(env, pstatus, site_flags, status_buf, &data_dbt);
if ((ret = __db_put(db_rep->gmdb,
ip, txn, &key_dbt, &data_dbt, 0)) != 0)
goto err;
@@ -1152,13 +1606,14 @@ retry:
locked = FALSE;
if ((ret = finish_gmdb_update(env, ip,
- &key_dbt, pstatus, ult_status, &logrec)) != 0)
+ &key_dbt, pstatus, ult_status, site_flags, &logrec)) != 0)
goto err;
/* Track modified membership status in our in-memory sites array. */
LOCK_MUTEX(db_rep->mutex);
locked = TRUE;
- ret = __repmgr_set_membership(env, addr.host, addr.port, ult_status);
+ ret = __repmgr_set_membership(env, addr.host, addr.port,
+ ult_status, site_flags);
__repmgr_set_sites(env);
err:
@@ -1173,7 +1628,7 @@ err:
* that we keep in sync.
*/
(void)__repmgr_set_membership(env,
- addr.host, addr.port, orig_status);
+ addr.host, addr.port, orig_status, site_flags);
}
if ((t_ret = __repmgr_cleanup_gmdb_op(env, do_close)) != 0 &&
ret == 0)
@@ -1215,13 +1670,14 @@ retry:
UNLOCK_MUTEX(db_rep->mutex);
marshal_site_key(env, &addr, key_buf, &key_dbt, &logrec);
- if ((ret = finish_gmdb_update(env,
- ip, &key_dbt, cur_status, new_status, &logrec)) != 0)
+ if ((ret = finish_gmdb_update(env, ip,
+ &key_dbt, cur_status, new_status, site->gmdb_flags, &logrec)) != 0)
goto err;
/* Track modified membership status in our in-memory sites array. */
LOCK_MUTEX(db_rep->mutex);
- ret = __repmgr_set_membership(env, addr.host, addr.port, new_status);
+ ret = __repmgr_set_membership(env, addr.host, addr.port,
+ new_status, site->gmdb_flags);
__repmgr_set_sites(env);
UNLOCK_MUTEX(db_rep->mutex);
@@ -1301,11 +1757,11 @@ __repmgr_set_gm_version(env, ip, txn, version)
* really deleted.
*/
static int
-finish_gmdb_update(env, ip, key_dbt, prev_status, status, logrec)
+finish_gmdb_update(env, ip, key_dbt, prev_status, status, flags, logrec)
ENV *env;
DB_THREAD_INFO *ip;
DBT *key_dbt;
- u_int32_t prev_status, status;
+ u_int32_t prev_status, status, flags;
__repmgr_member_args *logrec;
{
DB_REP *db_rep;
@@ -1324,7 +1780,7 @@ finish_gmdb_update(env, ip, key_dbt, prev_status, status, logrec)
if (status == 0)
ret = __db_del(db_rep->gmdb, ip, txn, key_dbt, 0);
else {
- marshal_site_data(env, status, data_buf, &data_dbt);
+ marshal_site_data(env, status, flags, data_buf, &data_dbt);
ret = __db_put(db_rep->gmdb, ip, txn, key_dbt, &data_dbt, 0);
}
if (ret != 0)
@@ -1617,16 +2073,18 @@ marshal_site_key(env, addr, buf, dbt, logrec)
}
static void
-marshal_site_data(env, status, buf, dbt)
+marshal_site_data(env, status, flags, buf, dbt)
ENV *env;
u_int32_t status;
+ u_int32_t flags;
u_int8_t *buf;
DBT *dbt;
{
- __repmgr_membership_data_args member_status;
+ __repmgr_membership_data_args member_data;
- member_status.flags = status;
- __repmgr_membership_data_marshal(env, &member_status, buf);
+ member_data.status = status;
+ member_data.flags = flags;
+ __repmgr_membership_data_marshal(env, &member_data, buf);
DB_INIT_DBT(*dbt, buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
}
@@ -1640,16 +2098,107 @@ __repmgr_set_sites(env)
ENV *env;
{
DB_REP *db_rep;
+ REP *rep;
int ret;
u_int32_t n;
u_int i;
db_rep = env->rep_handle;
+ rep = db_rep->region;
for (i = 0, n = 0; i < db_rep->site_cnt; i++) {
- if (db_rep->sites[i].membership > 0)
+ /*
+ * Views do not count towards nsites because they cannot
+ * vote in elections, become master or contribute to
+ * durability.
+ */
+ if (db_rep->sites[i].membership > 0 &&
+ !FLD_ISSET(db_rep->sites[i].gmdb_flags, SITE_VIEW))
n++;
}
ret = __rep_set_nsites_int(env, n);
DB_ASSERT(env, ret == 0);
+ if (FLD_ISSET(rep->config,
+ REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT) &&
+ rep->config_nsites > 2)
+ __db_errx(env, DB_STR("3701",
+ "More than two sites in preferred master replication group"));
+}
+
+/*
+ * If a site is rejoining a 2-site repgroup with 2SITE_STRICT off
+ * and has a rejection because it needs to catch up with the latest
+ * group membership database, it cannot call an election right away
+ * because it would win with only its own vote and ignore an existing
+ * master in the repgroup. Instead, this routine is used to call the
+ * deferred election after the site has rejoined the repgroup successfully.
+ */
+static int
+rejoin_deferred_election(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ u_int32_t flags;
+ int eid, ret;
+
+ db_rep = env->rep_handle;
+ LOCK_MUTEX(db_rep->mutex);
+
+ /*
+ * First, retry all connections so that the election can communicate
+ * with the other sites. Normally there should only be one other
+ * site in the repgroup, but it is safest to retry all remote sites
+ * found in case the group membership changed while we were gone.
+ */
+ FOR_EACH_REMOTE_SITE_INDEX(eid) {
+ if ((ret =
+ __repmgr_schedule_connection_attempt(env, eid, TRUE)) != 0)
+ break;
+ }
+
+ /*
+ * Call an immediate, but not a fast, election because a fast
+ * election reduces the number of votes needed by 1.
+ */
+ flags = ELECT_F_EVENT_NOTIFY;
+ if (FLD_ISSET(db_rep->region->config, REP_C_ELECTIONS))
+ LF_SET(ELECT_F_IMMED);
+ else
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Deferred rejoin election, but no elections"));
+ ret = __repmgr_init_election(env, flags);
+
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
+}
+/*
+ * If a site is rejoining a preferred master replication group and has a
+ * rejection because it needs to catch up with the latest group membership
+ * database, it needs to establish its "regular" connection to the other site
+ * so that it can proceed through the preferred master startup sequence.
+ */
+static int
+rejoin_connections(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ int eid, ret;
+
+ db_rep = env->rep_handle;
+ ret = 0;
+ LOCK_MUTEX(db_rep->mutex);
+
+ /*
+ * Retry all connections. Normally there should only be one other
+ * site in the repgroup, but it is safest to retry all remote sites
+ * found in case the group membership changed while we were gone.
+ */
+ FOR_EACH_REMOTE_SITE_INDEX(eid) {
+ if ((ret =
+ __repmgr_schedule_connection_attempt(env, eid, TRUE)) != 0)
+ break;
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (ret);
}
diff --git a/src/repmgr/repmgr_net.c b/src/repmgr/repmgr_net.c
index 54e3d066..334fd150 100644
--- a/src/repmgr/repmgr_net.c
+++ b/src/repmgr/repmgr_net.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -57,6 +57,7 @@ struct sending_msg {
* whether the PERM message should be considered durable.
*/
struct repmgr_permanence {
+ u_int32_t gen; /* Master generation for LSN. */
DB_LSN lsn; /* LSN whose ack this thread is waiting for. */
u_int threshold; /* Number of client acks to wait for. */
u_int quorum; /* Durability threshold for QUORUM policy. */
@@ -378,7 +379,7 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags)
goto out;
#undef SEND_ONE_CONNECTION
- nsites_sent = 1;
+ nsites_sent = FLD_ISSET(site->gmdb_flags, SITE_VIEW) ? 0 : 1;
npeers_sent = F_ISSET(site, SITE_ELECTABLE) ? 1 : 0;
missed_peer = FALSE;
}
@@ -418,7 +419,13 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags)
nclients = 0;
else if ((policy == DB_REPMGR_ACKS_ONE ||
policy == DB_REPMGR_ACKS_ONE_PEER) &&
- nclients == 1) {
+ nclients < 2) {
+ /*
+ * Adjust to QUORUM when first other
+ * participant joins (nclients=1) or when there
+ * are no other participants but a view joins
+ * (nclients=0) to get enough acks.
+ */
nclients = 0;
policy = DB_REPMGR_ACKS_QUORUM;
}
@@ -498,9 +505,16 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags)
if (nclients > 1 ||
FLD_ISSET(db_rep->region->config,
REP_C_2SITE_STRICT) ||
- db_rep->active_gmdb_update == gmdb_primary)
+ db_rep->active_gmdb_update == gmdb_primary) {
quorum = nclients / 2;
- else
+ /*
+ * An unelectable master can't be part of the
+ * QUORUM policy quorum.
+ */
+ if (rep->priority == 0 &&
+ policy == DB_REPMGR_ACKS_QUORUM)
+ quorum++;
+ } else
quorum = nclients;
if (policy == DB_REPMGR_ACKS_ALL_AVAILABLE) {
@@ -560,6 +574,7 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags)
/* In ALL_PEERS case, display of "needed" might be confusing. */
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"will await acknowledgement: need %u", needed));
+ perm.gen = rep->gen;
perm.lsn = *lsnp;
perm.threshold = needed;
perm.policy = policy;
@@ -734,8 +749,13 @@ __repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp, missingp)
* useful to keep letting a removed site see updates so that it
* learns of its own removal, and will know to rejoin at its
* next reboot.
+ *
+ * We never count sends to views because views cannot
+ * contribute to durability, but we always do the sends.
*/
- if (site->membership == SITE_PRESENT)
+ if (FLD_ISSET(site->gmdb_flags, SITE_VIEW))
+ full_member = FALSE;
+ else if (site->membership == SITE_PRESENT)
full_member = TRUE;
else {
full_member = FALSE;
@@ -802,7 +822,9 @@ send_connection(env, type, conn, msg, sent)
REPMGR_MAX_V1_MSG_TYPE,
REPMGR_MAX_V2_MSG_TYPE,
REPMGR_MAX_V3_MSG_TYPE,
- REPMGR_MAX_V4_MSG_TYPE
+ REPMGR_MAX_V4_MSG_TYPE,
+ REPMGR_MAX_V5_MSG_TYPE,
+ REPMGR_MAX_V6_MSG_TYPE
};
db_rep = env->rep_handle;
@@ -1132,18 +1154,24 @@ got_acks(env, context)
has_unacked_peer = FALSE;
FOR_EACH_REMOTE_SITE_INDEX(eid) {
site = SITE_FROM_EID(eid);
- if (site->membership != SITE_PRESENT)
+ /*
+ * Do not count an ack from a view because a view cannot
+ * contribute to durability.
+ */
+ if (FLD_ISSET(site->gmdb_flags, SITE_VIEW))
continue;
if (!F_ISSET(site, SITE_HAS_PRIO)) {
/*
- * Never connected to this site: since we can't know
- * whether it's a peer, assume the worst.
+ * We have not reconnected to this site since the last
+ * recovery. Since we don't yet know whether it's a
+ * peer, assume the worst.
*/
has_unacked_peer = TRUE;
continue;
}
- if (LOG_COMPARE(&site->max_ack, &perm->lsn) >= 0) {
+ if (site->max_ack_gen == perm->gen &&
+ LOG_COMPARE(&site->max_ack, &perm->lsn) >= 0) {
sites_acked++;
if (F_ISSET(site, SITE_ELECTABLE))
peers_acked++;
@@ -1206,6 +1234,7 @@ __repmgr_bust_connection(env, conn)
DB_REP *db_rep;
REP *rep;
REPMGR_SITE *site;
+ db_timespec now;
u_int32_t flags;
int ret, eid;
@@ -1259,7 +1288,9 @@ __repmgr_bust_connection(env, conn)
} else /* Subordinate connection. */
goto out;
- if ((ret = __repmgr_schedule_connection_attempt(env, eid, FALSE)) != 0)
+ /* Defer connection attempt if rejoining 2SITE_STRICT=off repgroup. */
+ if (!db_rep->rejoin_pending &&
+ (ret = __repmgr_schedule_connection_attempt(env, eid, FALSE)) != 0)
goto out;
/*
@@ -1267,11 +1298,47 @@ __repmgr_bust_connection(env, conn)
* master, assume that the master may have failed, and call for
* an election. But only do this for the connection to the main
* master process, not a subordinate one. And only do it if
- * we're our site's main process, not a subordinate one. And
+ * we're our site's listener process, not a subordinate one. And
* skip it if the application has configured us not to do
* elections.
*/
if (!IS_SUBORDINATE(db_rep) && eid == rep->master_id) {
+ if (FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER)) {
+ /*
+ * When the connection is from master's listener, if
+ * there is any other connection from a master's
+ * subordinate process that could take over as
+ * listener, we delay the election to allow some time
+ * for a new master listener to start. At the end of
+ * the delay, if there is still no master listener,
+ * call an election. There is a slight chance that
+ * we will delay the election to wait for an inactive
+ * connection which would never become the next main
+ * connection.
+ */
+ TAILQ_FOREACH(conn, &site->sub_conns, entries) {
+ if (conn->auto_takeover) {
+ if (!timespecisset(
+ &db_rep->m_listener_chk)) {
+ __os_gettime(env, &now, 1);
+ TIMESPEC_ADD_DB_TIMEOUT(&now,
+ db_rep->m_listener_wait);
+ db_rep->m_listener_chk = now;
+ }
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Master failure, but delay elections for takeover on master"));
+ return (0);
+ }
+ }
+ }
+
+ /* Defer election if rejoining 2SITE_STRICT=off repgroup. */
+ if (db_rep->rejoin_pending) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Deferring election after rejoin rejection"));
+ goto out;
+ }
+
/*
* Even if we're not doing elections, defer the event
* notification to later execution in the election
@@ -1285,6 +1352,17 @@ __repmgr_bust_connection(env, conn)
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Master failure, but no elections"));
+ /*
+ * In preferred master mode, a client that has lost its
+ * connection to the master uses an election thread to
+ * restart as master.
+ */
+ if (IS_PREFMAS_MODE(env)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+"bust_connection setting preferred master temp master"));
+ db_rep->prefmas_pending = start_temp_master;
+ }
+
if ((ret = __repmgr_init_election(env, flags)) != 0)
goto out;
}
@@ -1340,25 +1418,59 @@ __repmgr_disable_connection(env, conn)
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
- REPMGR_SITE *site;
+ REP *rep;
REPMGR_RESPONSE *resp;
+ REPMGR_SITE *site;
+ SITEINFO *sites;
u_int32_t i;
- int eid, ret, t_ret;
+ int eid, is_subord, orig_state, ret, t_ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
ret = 0;
+ is_subord = 0;
+ orig_state = conn->state;
conn->state = CONN_DEFUNCT;
if (conn->type == REP_CONNECTION) {
eid = conn->eid;
if (IS_VALID_EID(eid)) {
site = SITE_FROM_EID(eid);
if (conn != site->ref.conn.in &&
- conn != site->ref.conn.out)
- /* It's a subordinate connection. */
+ conn != site->ref.conn.out) {
+ /*
+ * It is a subordinate connection to disable.
+ * Remove it from the subordinate connection
+ * list, and decrease the number of listener
+ * candidates by 1 if it is from a subordinate
+ * rep-aware process that allows takeover.
+ */
TAILQ_REMOVE(&site->sub_conns, conn, entries);
+ SET_LISTENER_CAND(conn->auto_takeover, --);
+ is_subord = 1;
+ }
TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
conn->ref_count++;
+ /*
+ * Do not decrease sites_avail for a subordinate
+ * connection.
+ */
+ if (site->state == SITE_CONNECTED && !is_subord &&
+ (orig_state == CONN_READY ||
+ orig_state == CONN_CONGESTED)) {
+ /*
+ * Some thread orderings can cause a brief
+ * dip into a negative sites_avail value.
+ * Once it goes negative it stays negative,
+ * so avoid this. Future connections will
+ * be counted correctly.
+ */
+ if (rep->sites_avail > 0)
+ rep->sites_avail--;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "disable_conn: EID %lu disabled. sites_avail %lu",
+ (u_long)eid, (u_long)rep->sites_avail));
+ }
}
conn->eid = -1;
} else if (conn->type == APP_CONNECTION) {
@@ -1646,8 +1758,10 @@ flatten(env, msg)
}
/*
- * Scan the list of remote sites, returning the first one that is a peer,
- * is not the current master, and is available.
+ * Scan the list of remote sites, returning the first participant that is a
+ * peer, is not the current master, and is available. If there are no
+ * available participant peers but there is an available view peer, return the
+ * first available view peer.
*/
static REPMGR_SITE *
__repmgr_find_available_peer(env)
@@ -1656,23 +1770,28 @@ __repmgr_find_available_peer(env)
DB_REP *db_rep;
REP *rep;
REPMGR_CONNECTION *conn;
- REPMGR_SITE *site;
- u_int i;
+ REPMGR_SITE *site, *view;
+ u_int avail, i;
db_rep = env->rep_handle;
rep = db_rep->region;
+ view = NULL;
FOR_EACH_REMOTE_SITE_INDEX(i) {
site = &db_rep->sites[i];
- if (FLD_ISSET(site->config, DB_REPMGR_PEER) &&
- EID_FROM_SITE(site) != rep->master_id &&
- site->state == SITE_CONNECTED &&
+ avail = (site->state == SITE_CONNECTED &&
(((conn = site->ref.conn.in) != NULL &&
conn->state == CONN_READY) ||
((conn = site->ref.conn.out) != NULL &&
- conn->state == CONN_READY)))
+ conn->state == CONN_READY)));
+ if (FLD_ISSET(site->config, DB_REPMGR_PEER) &&
+ !FLD_ISSET(site->gmdb_flags, SITE_VIEW) &&
+ EID_FROM_SITE(site) != rep->master_id && avail)
return (site);
+ if (!view && FLD_ISSET(site->config, DB_REPMGR_PEER) &&
+ FLD_ISSET(site->gmdb_flags, SITE_VIEW) && avail)
+ view = site;
}
- return (NULL);
+ return (view);
}
/*
@@ -1852,6 +1971,7 @@ __repmgr_net_close(env)
site->ref.conn.out = NULL;
}
}
+ rep->sites_avail = 0;
if (db_rep->listen_fd != INVALID_SOCKET) {
if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0)
@@ -1870,22 +1990,28 @@ final_cleanup(env, conn, unused)
void *unused;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_SITE *site;
- int ret, t_ret;
+ SITEINFO *sites;
+ int eid, ret, t_ret;
COMPQUIET(unused, NULL);
db_rep = env->rep_handle;
+ rep = db_rep->region;
+ eid = conn->eid;
ret = __repmgr_close_connection(env, conn);
/* Remove the connection from whatever list it's on, if any. */
- if (conn->type == REP_CONNECTION && IS_VALID_EID(conn->eid)) {
- site = SITE_FROM_EID(conn->eid);
+ if (conn->type == REP_CONNECTION && IS_VALID_EID(eid)) {
+ site = SITE_FROM_EID(eid);
if (site->state == SITE_CONNECTED &&
(conn == site->ref.conn.in || conn == site->ref.conn.out)) {
/* Not on any list, so no need to do anything. */
- } else
+ } else {
TAILQ_REMOVE(&site->sub_conns, conn, entries);
+ SET_LISTENER_CAND(conn->auto_takeover, --);
+ }
t_ret = __repmgr_destroy_conn(env, conn);
} else {
diff --git a/src/repmgr/repmgr_posix.c b/src/repmgr/repmgr_posix.c
index 0687681a..c49017ff 100644
--- a/src/repmgr/repmgr_posix.c
+++ b/src/repmgr/repmgr_posix.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
diff --git a/src/repmgr/repmgr_queue.c b/src/repmgr/repmgr_queue.c
index 6a381acf..3a51b32b 100644
--- a/src/repmgr/repmgr_queue.c
+++ b/src/repmgr/repmgr_queue.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -22,13 +22,28 @@ __repmgr_queue_destroy(env)
ENV *env;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_MESSAGE *m;
REPMGR_CONNECTION *conn;
+ u_int32_t mtype;
int ret, t_ret;
+ COMPQUIET(mtype, 0);
+
db_rep = env->rep_handle;
+ rep = db_rep->region;
ret = 0;
+
+ /*
+ * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only do
+ * this for the main listener process. For a subordinate process,
+ * it is always turned on.
+ */
+ if (!STAILQ_EMPTY(&db_rep->input_queue.header) &&
+ !IS_SUBORDINATE(db_rep))
+ rep->inqueue_full_event_on = 1;
+
while (!STAILQ_EMPTY(&db_rep->input_queue.header)) {
m = STAILQ_FIRST(&db_rep->input_queue.header);
STAILQ_REMOVE_HEAD(&db_rep->input_queue.header, entries);
@@ -38,8 +53,25 @@ __repmgr_queue_destroy(env)
ret == 0)
ret = t_ret;
}
+ if (m->msg_hdr.type == REPMGR_OWN_MSG) {
+ mtype = REPMGR_OWN_MSG_TYPE(m->msg_hdr);
+ if ((conn = m->v.gmdb_msg.conn) != NULL) {
+ /*
+ * A site that removed itself may have already
+ * closed its connections.
+ */
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret == 0 &&
+ mtype != REPMGR_REMOVE_REQUEST)
+ ret = t_ret;
+ if ((t_ret = __repmgr_decr_conn_ref(env,
+ conn)) != 0 && ret == 0)
+ ret = t_ret;
+ }
+ }
__os_free(env, m);
}
+
return (ret);
}
@@ -60,14 +92,17 @@ __repmgr_queue_get(env, msgp, th)
REPMGR_RUNNABLE *th;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_MESSAGE *m;
#ifdef DB_WIN32
HANDLE wait_events[2];
#endif
+ u_int32_t msgsize;
int ret;
ret = 0;
db_rep = env->rep_handle;
+ rep = db_rep->region;
while ((m = available_work(env)) == NULL &&
db_rep->repmgr_status == running && !th->quit_requested) {
@@ -104,10 +139,42 @@ __repmgr_queue_get(env, msgp, th)
else {
STAILQ_REMOVE(&db_rep->input_queue.header,
m, __repmgr_message, entries);
- db_rep->input_queue.size--;
+ msgsize = (u_int32_t)m->size;
+ while (msgsize >= GIGABYTE) {
+ DB_ASSERT(env, db_rep->input_queue.gbytes > 0);
+ db_rep->input_queue.gbytes--;
+ msgsize -= GIGABYTE;
+ }
+ if (db_rep->input_queue.bytes < msgsize) {
+ DB_ASSERT(env, db_rep->input_queue.gbytes > 0);
+ db_rep->input_queue.gbytes--;
+ db_rep->input_queue.bytes += GIGABYTE;
+ }
+ db_rep->input_queue.bytes -= msgsize;
+
+ /*
+ * Check if current size is out of the red zone.
+ * If it is, we will turn on the DB_EVENT_REP_INQUEUE_FULL
+ * event firing.
+ *
+ * We only have the redzone machanism for the main listener
+ * process.
+ */
+ if (!IS_SUBORDINATE(db_rep) &&
+ rep->inqueue_full_event_on == 0) {
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ if (db_rep->input_queue.gbytes <
+ rep->inqueue_rz_gbytes ||
+ (db_rep->input_queue.gbytes ==
+ rep->inqueue_rz_gbytes &&
+ db_rep->input_queue.bytes <
+ rep->inqueue_rz_bytes))
+ rep->inqueue_full_event_on = 1;
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ }
+
*msgp = m;
}
-
err:
return (ret);
}
@@ -157,24 +224,55 @@ __repmgr_queue_put(env, msg)
REPMGR_MESSAGE *msg;
{
DB_REP *db_rep;
+ REP *rep;
+ u_int32_t msgsize;
db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ /*
+ * Drop message if incoming queue contains more messages than the
+ * limit. See dbenv->repmgr_set_incoming_queue_max() for more
+ * information.
+ */
+ MUTEX_LOCK(env, rep->mtx_repmgr);
+ if (db_rep->input_queue.gbytes > rep->inqueue_max_gbytes ||
+ (db_rep->input_queue.gbytes == rep->inqueue_max_gbytes &&
+ db_rep->input_queue.bytes >= rep->inqueue_max_bytes)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "incoming queue limit exceeded"));
+ STAT(rep->mstat.st_incoming_msgs_dropped++);
+ if (IS_SUBORDINATE(db_rep) || rep->inqueue_full_event_on) {
+ DB_EVENT(env, DB_EVENT_REP_INQUEUE_FULL, NULL);
+ /*
+ * We will always disable the event firing after
+ * the queue is full. It will be enabled again
+ * after the incoming queue size is out of the
+ * redzone.
+ *
+ * We only have the redzone machanism for the main
+ * listener process.
+ */
+ if (!IS_SUBORDINATE(db_rep))
+ rep->inqueue_full_event_on = 0;
+ }
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
+ __os_free(env, msg);
+ return (0);
+ }
+ MUTEX_UNLOCK(env, rep->mtx_repmgr);
STAILQ_INSERT_TAIL(&db_rep->input_queue.header, msg, entries);
- db_rep->input_queue.size++;
+ msgsize = (u_int32_t)msg->size;
+ while (msgsize >= GIGABYTE) {
+ msgsize -= GIGABYTE;
+ db_rep->input_queue.gbytes++;
+ }
+ db_rep->input_queue.bytes += msgsize;
+ if (db_rep->input_queue.bytes >= GIGABYTE) {
+ db_rep->input_queue.gbytes++;
+ db_rep->input_queue.bytes -= GIGABYTE;
+ }
return (__repmgr_signal(&db_rep->msg_avail));
}
-
-/*
- * PUBLIC: int __repmgr_queue_size __P((ENV *));
- *
- * !!!
- * Caller must hold repmgr->mutex.
- */
-int
-__repmgr_queue_size(env)
- ENV *env;
-{
- return (env->rep_handle->input_queue.size);
-}
diff --git a/src/repmgr/repmgr_rec.c b/src/repmgr/repmgr_rec.c
index 41827aff..568df45d 100644
--- a/src/repmgr/repmgr_rec.c
+++ b/src/repmgr/repmgr_rec.c
@@ -1,3 +1,11 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 2014, 2015 Oracle and/or its affiliates. All rights reserved.
+ *
+ * $Id$
+ */
+
#include "db_config.h"
#include "db_int.h"
@@ -31,7 +39,7 @@ __repmgr_member_recover(env, dbtp, lsnp, op, info)
/*
* The annotation log record describes the update in enough detail for
- * us to be able to optimize our tracking of it at clients sites.
+ * us to be able to optimize our tracking of it at client sites.
* However, for now we just simply reread the whole (small) database
* each time, since changes happen so seldom (and we need to have the
* code for reading the whole thing anyway, for other cases).
diff --git a/src/repmgr/repmgr_sel.c b/src/repmgr/repmgr_sel.c
index ba14368f..c32dad25 100644
--- a/src/repmgr/repmgr_sel.c
+++ b/src/repmgr/repmgr_sel.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -12,7 +12,7 @@
typedef int (*HEARTBEAT_ACTION) __P((ENV *));
-static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
+static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *, int *));
static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
static void check_min_log_file __P((ENV *));
static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
@@ -23,13 +23,18 @@ static int process_parameters __P((ENV *,
static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
static int record_permlsn __P((ENV *, REPMGR_CONNECTION *));
static int __repmgr_call_election __P((ENV *));
+static int __repmgr_check_listener __P((ENV *));
+static int __repmgr_check_master_listener __P((ENV *));
static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *));
static void *__repmgr_connector_thread __P((void *));
static int __repmgr_next_timeout __P((ENV *,
db_timespec *, HEARTBEAT_ACTION *));
+static int __repmgr_reset_last_rcvd __P((ENV *));
static int __repmgr_retry_connections __P((ENV *));
static int __repmgr_send_heartbeat __P((ENV *));
-static int __repmgr_try_one __P((ENV *, int));
+static int __repmgr_start_takeover __P((ENV *));
+static void *__repmgr_takeover_thread __P((void *));
+static int __repmgr_try_one __P((ENV *, int, int));
static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *));
static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
@@ -49,17 +54,24 @@ void *
__repmgr_select_thread(argsp)
void *argsp;
{
- REPMGR_RUNNABLE *args;
ENV *env;
+ DB_THREAD_INFO *ip;
int ret;
+ REPMGR_RUNNABLE *args;
args = argsp;
env = args->env;
+ ip = NULL;
+ ret = 0;
- if ((ret = __repmgr_select_loop(env)) != 0) {
+ ENV_ENTER_RET(env, ip, ret);
+ if (ret != 0 || (ret = __repmgr_select_loop(env)) != 0) {
__db_err(env, ret, DB_STR("3614", "select loop failed"));
+ ENV_LEAVE(env, ip);
(void)__repmgr_thread_failure(env, ret);
}
+ if (ret == 0)
+ ENV_LEAVE(env, ip);
return (NULL);
}
@@ -71,12 +83,19 @@ __repmgr_bow_out(env)
ENV *env;
{
DB_REP *db_rep;
+ REP *rep;
int ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
LOCK_MUTEX(db_rep->mutex);
ret = __repmgr_stop_threads(env);
UNLOCK_MUTEX(db_rep->mutex);
+ /*
+ * Reset sites_avail so that it will be calculated correctly if this
+ * site rejoins the group in the future.
+ */
+ rep->sites_avail = 0;
DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL);
return (ret);
}
@@ -187,23 +206,53 @@ __repmgr_compute_timeout(env, timeout)
db_rep = env->rep_handle;
/*
- * There are two factors to consider: are heartbeats in use? and, do we
+ * There are four factors to consider: are heartbeats in use? do we
* have any sites with broken connections that we ought to retry?
+ * is there a listener process running locally? do we need to call
+ * an election if no master listener exists?
*/
have_timeout = __repmgr_next_timeout(env, &t, NULL);
/* List items are in order, so we only have to examine the first one. */
if (!TAILQ_EMPTY(&db_rep->retries)) {
retry = TAILQ_FIRST(&db_rep->retries);
- if (have_timeout) {
+ if (have_timeout)
/* Choose earliest timeout deadline. */
t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
- } else {
+ else {
t = retry->time;
have_timeout = TRUE;
}
}
+ /* Check listener every timeout in subordinate rep-aware process. */
+ if (IS_LISTENER_CAND(db_rep)) {
+ if (!timespecisset(&db_rep->l_listener_chk)) {
+ __os_gettime(env, &now, 1);
+ TIMESPEC_ADD_DB_TIMEOUT(&now, db_rep->l_listener_wait);
+ db_rep->l_listener_chk = now;
+ }
+ if (have_timeout)
+ t = timespeccmp(&db_rep->l_listener_chk, &t, <) ?
+ db_rep->l_listener_chk : t;
+ else {
+ t = db_rep->l_listener_chk;
+ have_timeout = TRUE;
+ }
+ }
+
+ /* Check master listener if needed. */
+ if (FLD_ISSET(db_rep->region->config, REP_C_AUTOTAKEOVER) &&
+ timespecisset(&db_rep->m_listener_chk)) {
+ if (have_timeout)
+ t = timespeccmp(&db_rep->m_listener_chk, &t, <) ?
+ db_rep->m_listener_chk : t;
+ else {
+ t = db_rep->m_listener_chk;
+ have_timeout = TRUE;
+ }
+ }
+
if (have_timeout) {
__os_gettime(env, &now, 1);
if (timespeccmp(&now, &t, >=))
@@ -242,7 +291,17 @@ __repmgr_next_timeout(env, deadline, action)
if (rep->master_id == db_rep->self_eid &&
rep->heartbeat_frequency > 0) {
- t = db_rep->last_bcast;
+ /*
+ * A temporary master in preferred master mode must send
+ * regular heartbeats regardless of other activity because
+ * the preferred master requires a heartbeat to take over as
+ * master after it has synced with the temporary master.
+ */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT))
+ t = db_rep->last_hbeat;
+ else
+ t = db_rep->last_bcast;
TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency);
my_action = __repmgr_send_heartbeat;
} else if ((master = __repmgr_connected_master(env)) != NULL &&
@@ -301,6 +360,24 @@ __repmgr_send_heartbeat(env)
db_rep = env->rep_handle;
rep = db_rep->region;
+ ret = 0;
+
+ /*
+ * Check test hook preventing heartbeats and connection attempts.
+ * This is used to create and maintain a dupmaster condition in
+ * a test until the test hook is rescinded.
+ */
+ DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT);
+
+ /*
+ * Track last heartbeat for temporary master in preferred master
+ * mode so that it will send regular heartbeats regardless of
+ * other activity.
+ */
+ if (IS_PREFMAS_MODE(env) &&
+ FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) &&
+ rep->master_id == db_rep->self_eid)
+ __os_gettime(env, &db_rep->last_hbeat, 1);
permlsn.generation = rep->gen;
if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0)
@@ -310,8 +387,11 @@ __repmgr_send_heartbeat(env)
control.size = __REPMGR_PERMLSN_SIZE;
DB_INIT_DBT(rec, NULL, 0);
- return (__repmgr_send_broadcast(env,
- REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3));
+ ret =__repmgr_send_broadcast(env,
+ REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3);
+
+DB_TEST_RECOVERY_LABEL
+ return (ret);
}
/*
@@ -373,6 +453,8 @@ __repmgr_check_timeouts(env)
HEARTBEAT_ACTION action;
int ret;
+ ret = 0;
+
/*
* Figure out the next heartbeat-related thing to be done. Then, if
* it's time to do it, do so.
@@ -384,7 +466,342 @@ __repmgr_check_timeouts(env)
return (ret);
}
- return (__repmgr_retry_connections(env));
+ /* Check the existence of local listener. */
+ if ((ret = __repmgr_check_listener(env)) != 0)
+ return (ret);
+
+ /* Check the existence of master listener. */
+ if ((ret = __repmgr_check_master_listener(env)) != 0)
+ return (ret);
+
+ /*
+ * Check test hook preventing heartbeats and connection attempts.
+ * This is used to create and maintain a dupmaster condition in
+ * a test until the test hook is rescinded.
+ */
+ DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT);
+
+ ret = __repmgr_retry_connections(env);
+
+DB_TEST_RECOVERY_LABEL
+ return (ret);
+}
+
+/*
+ * Check the existence of the listener process on the local site. If one
+ * does not exist and the current process is a subordinate rep-aware process,
+ * then start a takeover thread to covert this process to the listener process.
+ */
+static int
+__repmgr_check_listener(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ SITEINFO *sites;
+ db_timespec t;
+ int ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
+
+ /*
+ * Only subordinate rep-aware process can take over listener role, so
+ * no need to check listener in listener process or rep unaware process.
+ */
+ if (!IS_LISTENER_CAND(db_rep))
+ return (0);
+
+ /*
+ * If the listener quits due to site removal, no subordinate process
+ * should take over as listener as the current site is not expected
+ * to be active in the group. Check the status from the site array
+ * in the shared region instead of that in the GMDB. We do this
+ * because the GMDB doesn't apply the change yet when replication
+ * is stopped on the removed site.
+ */
+ sites = R_ADDR(env->reginfo, rep->siteinfo_off);
+ if (sites[rep->self_eid].status == SITE_DELETING)
+ return (0);
+
+ /*
+ * Check the listener after timeout. If there is no listener, we
+ * take over. During takeover, we will refresh all connections.
+ * A subordinate process does not have an up-to-date site list, so sync
+ * up addresses from the in-memory site array before takeover.
+ */
+ __os_gettime(env, &t, 1);
+ if (timespeccmp(&t, &db_rep->l_listener_chk, >=)) {
+ /* Compute the next timeout. */
+ TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->l_listener_wait);
+ db_rep->l_listener_chk = t;
+
+ /* Check if site address information needs to be refreshed. */
+ if ((rep->siteinfo_seq > db_rep->siteinfo_seq) &&
+ (ret = __repmgr_sync_siteaddr(env)) != 0)
+ return (ret);
+
+ if (rep->listener == 0)
+ ret = __repmgr_start_takeover(env);
+ }
+ return (ret);
+}
+
+/*
+ * Start a thread to take over the listener role in the current subordinate
+ * process.
+ */
+static int
+__repmgr_start_takeover(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_RUNNABLE *th;
+ int ret;
+
+ db_rep = env->rep_handle;
+ th = db_rep->takeover_thread;
+ if (th == NULL) {
+ if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE),
+ &th)) != 0)
+ return (ret);
+ db_rep->takeover_thread = th;
+ } else if (th->finished) {
+ if ((ret = __repmgr_thread_join(th)) != 0)
+ return (ret);
+ } else {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "takeover thread still running"));
+ return (0);
+ }
+ th->run = __repmgr_takeover_thread;
+ if ((ret = __repmgr_thread_start(env, th)) != 0) {
+ __os_free(env, th);
+ db_rep->takeover_thread = NULL;
+ }
+ return (ret);
+}
+
+/*
+ * Take over listener role in the current subordinate process.
+ */
+static void *
+__repmgr_takeover_thread(argsp)
+ void *argsp;
+{
+ DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
+ ENV *env;
+ REP *rep;
+ REPMGR_RUNNABLE *th;
+ int nthreads, ret, save_policy;
+
+ th = argsp;
+ env = th->env;
+ db_rep = env->rep_handle;
+ ip = NULL;
+ rep = db_rep->region;
+ ret = 0;
+
+ ENV_ENTER_RET(env, ip, ret);
+ if (ret != 0)
+ goto out;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC, "starting takeover thread"));
+ /*
+ * It is likely that there is an old heartbeat ready to expire
+ * immediately upon restarting repmgr, leading to an unnecessary
+ * election. Reset the expiration countdown here to avoid this.
+ */
+ if ((ret = __repmgr_reset_last_rcvd(env)) != 0)
+ goto out;
+ /*
+ * If nthreads is set to be 0 in the current subordinate process, use
+ * the value in the last listener. The nthreads should be larger than
+ * 0 in listener.
+ */
+ nthreads = db_rep->config_nthreads == 0 ? (int)rep->listener_nthreads :
+ db_rep->config_nthreads;
+ /*
+ * It is possible that this subordinate process does not have intact
+ * connections to the other sites. For most ack policies, restarting
+ * repmgr will wait for acks when it commits its transaction to reload
+ * the gmdb. Temporarily set the ack policy to NONE for the takeover
+ * so that it is not delayed waiting for acks that can never come.
+ */
+ save_policy = rep->perm_policy;
+ rep->perm_policy = DB_REPMGR_ACKS_NONE;
+ /*
+ * Restart the repmgr as listener. If DB_REP_IGNORE is returned,
+ * the current process has become listener. If DB_REP_UNAVAIL is
+ * returned, the site has been removed from the group and no listener
+ * should be started. For any other error, if the replication is
+ * stopped because of the takeover thread, we will notify the
+ * application.
+ */
+ ret = __repmgr_start_int(env, nthreads, F_ISSET(rep, REP_F_MASTER) ?
+ DB_REP_MASTER : DB_REP_CLIENT);
+ if (ret == 0 && !IS_SUBORDINATE(db_rep) &&
+ db_rep->repmgr_status == running) {
+ STAT(rep->mstat.st_takeovers++);
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "finished takeover and became listener"));
+ } else if (ret != 0 && db_rep->repmgr_status == stopped) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "failed to take over, repmgr was stopped"));
+ DB_EVENT(env, DB_EVENT_REP_AUTOTAKEOVER_FAILED, NULL);
+ } else {
+ /* The current process is not changed to listener. */
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC, "failed to take over"));
+ }
+ rep->perm_policy = save_policy;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC, "takeover thread is exiting"));
+ ENV_LEAVE(env, ip);
+out: th->finished = TRUE;
+ return (NULL);
+}
+
+/*
+ * Reset the last_rcvd_timestamp to restart the wait for a heartbeat
+ * monitor expiration.
+ */
+static int
+__repmgr_reset_last_rcvd(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_SITE *master;
+
+ db_rep = env->rep_handle;
+
+ LOCK_MUTEX(db_rep->mutex);
+ if ((master = __repmgr_connected_master(env)) != NULL)
+ __os_gettime(env, &master->last_rcvd_timestamp, 1);
+ UNLOCK_MUTEX(db_rep->mutex);
+ return (0);
+}
+
+/*
+ * Monitor the connection to master listener. When the master listener is
+ * disconnected and some other master process might take over as listener
+ * soon, we will delay the election. After the delay if there is still no
+ * connection from master listener, call an election then.
+ */
+static int
+__repmgr_check_master_listener(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ REPMGR_SITE *master;
+ db_timespec t;
+ u_int32_t flags;
+ int ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ ret = 0;
+
+ /*
+ * We only check for a master listener if m_listener_chk is set.
+ * The field is only set when __repmgr_bust_connection() previously
+ * detected the loss of our connection to the master listener.
+ * If rep->master_id is invalid, wait until it is ready to check.
+ */
+ if (!FLD_ISSET((db_rep)->region->config, REP_C_AUTOTAKEOVER) ||
+ !timespecisset(&db_rep->m_listener_chk) ||
+ !IS_VALID_EID(rep->master_id))
+ return (0);
+
+ __os_gettime(env, &t, 1);
+ if (timespeccmp(&t, &db_rep->m_listener_chk, >=)) {
+ master = SITE_FROM_EID(db_rep->region->master_id);
+ if (master->ref.conn.out == NULL &&
+ master->ref.conn.in == NULL) {
+ flags = ELECT_F_EVENT_NOTIFY;
+ if (FLD_ISSET(db_rep->region->config, REP_C_ELECTIONS))
+ LF_SET(ELECT_F_IMMED | ELECT_F_FAST);
+ else
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Master failure, but no elections"));
+
+ /*
+ * In preferred master mode, a client that has lost its
+ * connection to the master uses an election thread to
+ * restart as master.
+ */
+ if (IS_PREFMAS_MODE(env)) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+"check_master_listener setting preferred master temp master"));
+ db_rep->prefmas_pending = start_temp_master;
+ }
+
+ ret = __repmgr_init_election(env, flags);
+ }
+ /*
+ * If the delay has expired reset m_listener_chk. We reset
+ * it whether or not the master listener process comes back
+ * so that we will not continue checking for a master listener
+ * indefinitely.
+ */
+ timespecclear(&db_rep->m_listener_chk);
+ }
+ return (ret);
+}
+
+/*
+ * Wake up I/O waiting in selector thread, refresh connections to all connected
+ * and present sites.
+ *
+ * PUBLIC: int __repmgr_refresh_selector __P((ENV *));
+ */
+int
+__repmgr_refresh_selector(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ REPMGR_RETRY *retry;
+ REPMGR_SITE *site;
+ SITEINFO *sites;
+ int eid, ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ if ((ret = __repmgr_wake_main_thread(env)) != 0)
+ return (ret);
+
+ FOR_EACH_REMOTE_SITE_INDEX(eid) {
+ SET_LISTENER_CAND(1, = 0);
+ site = SITE_FROM_EID(eid);
+
+ /*
+ * It is possible some sites were left in a paused state
+ * during the switch, so they have to be removed from the
+ * retry list.
+ */
+ if (site->state == SITE_PAUSING) {
+ retry = site->ref.retry;
+ if (retry != NULL) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Removing site from retry list eid %lu",
+ (u_long)eid));
+ TAILQ_REMOVE(&db_rep->retries, retry, entries);
+ __os_free(env, retry);
+ site->ref.retry = NULL;
+ }
+
+ }
+ /*
+ * Try to connect to any site that is now PRESENT after
+ * rereading the gmdb.
+ */
+ if (site->membership == SITE_PRESENT &&
+ (ret = __repmgr_try_one(env, eid, TRUE)) != 0)
+ return (ret);
+ }
+ return (0);
}
/*
@@ -415,10 +832,11 @@ __repmgr_retry_connections(env)
__os_free(env, retry);
DB_ASSERT(env, IS_VALID_EID(eid));
site = SITE_FROM_EID(eid);
+ site->ref.retry = NULL;
DB_ASSERT(env, site->state == SITE_PAUSING);
if (site->membership == SITE_PRESENT) {
- if ((ret = __repmgr_try_one(env, eid)) != 0)
+ if ((ret = __repmgr_try_one(env, eid, FALSE)) != 0)
return (ret);
} else
site->state = SITE_IDLE;
@@ -437,11 +855,23 @@ __repmgr_first_try_connections(env)
ENV *env;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_SITE *site;
+ SITEINFO *sites;
int eid, ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ /*
+ * Check test hook preventing heartbeats and connection attempts.
+ * This is used to create and maintain a dupmaster condition in
+ * a test until the test hook is rescinded.
+ */
+ DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT);
+
FOR_EACH_REMOTE_SITE_INDEX(eid) {
+ SET_LISTENER_CAND(1, = 0);
site = SITE_FROM_EID(eid);
/*
* Normally all sites would be IDLE here. But if a user thread
@@ -453,19 +883,22 @@ __repmgr_first_try_connections(env)
*/
if (site->state == SITE_IDLE &&
site->membership == SITE_PRESENT &&
- (ret = __repmgr_try_one(env, eid)) != 0)
+ (ret = __repmgr_try_one(env, eid, FALSE)) != 0)
return (ret);
}
+DB_TEST_RECOVERY_LABEL
return (0);
}
/*
- * Starts a thread to open a connection to the site at the given EID.
+ * Starts a thread to open a connection to the site at the given EID. We might
+ * have no connection to the site, or an existing connection to be replaced.
*/
static int
-__repmgr_try_one(env, eid)
+__repmgr_try_one(env, eid, refresh)
ENV *env;
int eid;
+ int refresh;
{
DB_REP *db_rep;
REPMGR_SITE *site;
@@ -488,13 +921,22 @@ __repmgr_try_one(env, eid)
"eid %lu previous connector thread still running; will retry",
(u_long)eid));
return (__repmgr_schedule_connection_attempt(env,
- eid, FALSE));
+ eid, refresh));
}
site->state = SITE_CONNECTING;
th->run = __repmgr_connector_thread;
- th->args.eid = eid;
+ th->args.conn_th.eid = eid;
+ /*
+ * The flag CONNECT_F_REFRESH indicates an immediate connection attempt
+ * should be scheduled if the current connection attempt fails. It is
+ * turned on before the first attempt to refresh the connection but
+ * turned off if the first attempt fails. In this way, when refreshing
+ * the connection, there will be at most two immediate connection
+ * attempts, after that, retry as usual.
+ */
+ th->args.conn_th.flags = refresh ? CONNECT_F_REFRESH : 0;
if ((ret = __repmgr_thread_start(env, th)) != 0) {
__os_free(env, th);
site->connector = NULL;
@@ -506,21 +948,33 @@ static void *
__repmgr_connector_thread(argsp)
void *argsp;
{
- REPMGR_RUNNABLE *th;
ENV *env;
+ DB_THREAD_INFO *ip;
+ REPMGR_RUNNABLE *th;
int ret;
th = argsp;
env = th->env;
+ ip = NULL;
+ ret = 0;
- RPRINT(env, (env, DB_VERB_REPMGR_MISC,
- "starting connector thread, eid %u", th->args.eid));
- if ((ret = __repmgr_connector_main(env, th)) != 0) {
+ ENV_ENTER_RET(env, ip, ret);
+ if (ret == 0)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "starting connector thread, eid %u",
+ th->args.conn_th.eid));
+ if (ret != 0 || (ret = __repmgr_connector_main(env, th)) != 0) {
__db_err(env, ret, DB_STR("3617", "connector thread failed"));
+ RPRINT(env, (env,
+ DB_VERB_REPMGR_MISC, "connector thread is exiting"));
+ ENV_LEAVE(env, ip);
(void)__repmgr_thread_failure(env, ret);
}
- RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting"));
-
+ if (ret == 0) {
+ RPRINT(env, (env,
+ DB_VERB_REPMGR_MISC, "connector thread is exiting"));
+ ENV_LEAVE(env, ip);
+ }
th->finished = TRUE;
return (NULL);
}
@@ -542,8 +996,8 @@ __repmgr_connector_main(env, th)
ret = 0;
LOCK_MUTEX(db_rep->mutex);
- DB_ASSERT(env, IS_VALID_EID(th->args.eid));
- site = SITE_FROM_EID(th->args.eid);
+ DB_ASSERT(env, IS_VALID_EID(th->args.conn_th.eid));
+ site = SITE_FROM_EID(th->args.conn_th.eid);
if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped)
goto unlock;
@@ -563,7 +1017,8 @@ __repmgr_connector_main(env, th)
UNLOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) {
- DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &th->args.eid);
+ DB_EVENT(env,
+ DB_EVENT_REP_CONNECT_ESTD, &th->args.conn_th.eid);
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
__db_err(env, ret, DB_STR("3618",
@@ -571,33 +1026,53 @@ __repmgr_connector_main(env, th)
goto cleanup;
}
conn->type = REP_CONNECTION;
- site = SITE_FROM_EID(th->args.eid);
+ site = SITE_FROM_EID(th->args.conn_th.eid);
if (site->state != SITE_CONNECTING ||
db_rep->repmgr_status == stopped)
goto cleanup;
- conn->eid = th->args.eid;
- site = SITE_FROM_EID(th->args.eid);
- site->ref.conn.out = conn;
+ conn->eid = th->args.conn_th.eid;
+ site = SITE_FROM_EID(th->args.conn_th.eid);
+ /*
+ * If there is an existing outgoing connection, disable it and
+ * replace it with a new connection. The sites for a formerly
+ * subordinate handle that is now taking over might still be
+ * SITE_CONNECTING. Set to SITE_CONNECTED before disabling
+ * connection so that sites_avail is correctly maintained.
+ */
site->state = SITE_CONNECTED;
+ if (site->ref.conn.out != NULL)
+ (void)__repmgr_disable_connection(env,
+ site->ref.conn.out);
+ site->ref.conn.out = conn;
__os_gettime(env, &site->last_rcvd_timestamp, 1);
ret = __repmgr_wake_main_thread(env);
} else if (ret == DB_REP_UNAVAIL) {
/* Retryable error while trying to connect: retry later. */
- info.eid = th->args.eid;
+ info.eid = th->args.conn_th.eid;
info.error = err;
DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info);
STAT(db_rep->region->mstat.st_connect_fail++);
LOCK_MUTEX(db_rep->mutex);
- site = SITE_FROM_EID(th->args.eid);
+ site = SITE_FROM_EID(th->args.conn_th.eid);
if (site->state != SITE_CONNECTING ||
db_rep->repmgr_status == stopped) {
ret = 0;
goto unlock;
}
+ /*
+ * If it fails to create a new outgoing connection to replace
+ * the existing one in the first attempt, schedule another
+ * immediate attempt. If it is our second attempt, disable
+ * the existing connections and retry as normal.
+ */
+ if (site->ref.conn.out != NULL && th->args.conn_th.flags == 0)
+ (void)__repmgr_disable_connection(env,
+ site->ref.conn.out);
ret = __repmgr_schedule_connection_attempt(env,
- th->args.eid, FALSE);
+ th->args.conn_th.eid,
+ th->args.conn_th.flags == CONNECT_F_REFRESH);
} else
goto out;
@@ -842,6 +1317,7 @@ prepare_input(env, conn)
if ((ret = __os_malloc(env, memsize, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
+ conn->input.rep_message->size = memsize;
conn->input.rep_message->msg_hdr = msg_hdr;
conn->input.rep_message->v.repmsg.originating_eid = conn->eid;
@@ -876,6 +1352,7 @@ prepare_input(env, conn)
if ((ret = __os_malloc(env, memsize, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
+ conn->input.rep_message->size = memsize;
conn->input.rep_message->msg_hdr = msg_hdr;
conn->input.rep_message->v.appmsg.conn = conn;
@@ -891,6 +1368,7 @@ prepare_input(env, conn)
if ((ret = __os_malloc(env, size, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
+ conn->input.rep_message->size = size;
conn->input.rep_message->msg_hdr = msg_hdr;
/*
@@ -1065,16 +1543,18 @@ dispatch_msgin(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
+ DBT *dbt;
DB_REP *db_rep;
- REPMGR_SITE *site;
- REPMGR_RUNNABLE *th;
+ REP *rep;
REPMGR_RESPONSE *resp;
- DBT *dbt;
+ REPMGR_RUNNABLE *th;
+ REPMGR_SITE *site;
char *hostname;
- int eid, ret;
+ int eid, ret, subord;
DB_ASSERT(env, conn->reading_phase == DATA_PHASE);
db_rep = env->rep_handle;
+ rep = db_rep->region;
switch (conn->state) {
case CONN_CONNECTED:
@@ -1129,9 +1609,22 @@ dispatch_msgin(env, conn)
dbt = &conn->input.repmgr_msg.rec;
hostname = dbt->data;
hostname[dbt->size-1] = '\0';
- if ((ret = accept_handshake(env, conn, hostname)) != 0)
+ if ((ret = accept_handshake(env,
+ conn, hostname, &subord)) != 0)
return (ret);
conn->state = CONN_READY;
+ site = SITE_FROM_EID(conn->eid);
+ /*
+ * Do not increase sites_avail redundantly for an
+ * incoming subordinate connection.
+ */
+ if (conn->type == REP_CONNECTION &&
+ site->state == SITE_CONNECTED && !subord) {
+ rep->sites_avail++;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "msgin: EID %lu CONNECTED, READY. sites_avail %lu",
+ (u_long)conn->eid, (u_long)rep->sites_avail));
+ }
break;
case REPMGR_OWN_MSG:
/*
@@ -1279,9 +1772,11 @@ process_own_msg(env, conn)
REPMGR_SITE *site;
REPMGR_MESSAGE *msg;
__repmgr_connect_reject_args reject;
+ __repmgr_v4connect_reject_args v4reject;
__repmgr_parm_refresh_args parms;
int ret;
+ db_rep = env->rep_handle;
ret = 0;
/*
* Set "msg" to point to the message struct. If we do all necessary
@@ -1293,28 +1788,61 @@ process_own_msg(env, conn)
switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) {
case REPMGR_CONNECT_REJECT:
dbt = &msg->v.gmdb_msg.request;
- if ((ret = __repmgr_connect_reject_unmarshal(env,
- &reject, dbt->data, dbt->size, NULL)) != 0)
- return (DB_REP_UNAVAIL);
+ if (conn->version < 5) {
+ if ((ret = __repmgr_v4connect_reject_unmarshal(env,
+ &v4reject, dbt->data, dbt->size, NULL)) != 0)
+ return (DB_REP_UNAVAIL);
+ reject.version = v4reject.version;
+ reject.gen = v4reject.gen;
+ reject.status = 0;
+ } else {
+ if ((ret = __repmgr_connect_reject_unmarshal(env,
+ &reject, dbt->data, dbt->size, NULL)) != 0)
+ return (DB_REP_UNAVAIL);
+ }
/*
* If we're being rejected by someone who has more up-to-date
- * membership information than we do, it means we have been
- * removed from the group. If we've just gotten started, we can
- * make one attempt at automatically rejoining; otherwise we bow
- * out gracefully.
+ * membership information than we do, it means we are not in
+ * the group. If we've just gotten started, or our status is
+ * adding, we can make one attempt at automatically rejoining;
+ * otherwise we bow out gracefully.
*/
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
- "got rejection msg citing version %lu/%lu",
- (u_long)reject.gen, (u_long)reject.version));
+ "got rejection msg citing version %lu/%lu mine %lu/%lu membership %lu",
+ (u_long)reject.gen, (u_long)reject.version,
+ (u_long)db_rep->member_version_gen,
+ (u_long)db_rep->membership_version,
+ (u_long)reject.status));
if (__repmgr_gmdb_version_cmp(env,
reject.gen, reject.version) > 0) {
- if (env->rep_handle->seen_repmsg)
+ if (db_rep->seen_repmsg && reject.status != SITE_ADDING)
ret = DB_DELETED;
- else if ((ret = __repmgr_defer_op(env,
- REPMGR_REJOIN)) == 0)
- ret = DB_REP_UNAVAIL;
+ else {
+ /*
+ * If 2SITE_STRICT is off, we are likely to
+ * win an election with our own vote before
+ * discovering there is already a master.
+ * Set indicator to defer the election until
+ * after rejoining group.
+ *
+ * In preferred master mode, either site
+ * should defer the election (which
+ * executes the preferred master startup
+ * code and only calls an election if it is
+ * safe) and also avoid scheduling an extra
+ * reconnect attempt in bust_connection()
+ * by setting the indicator.
+ */
+ if (!FLD_ISSET(db_rep->region->config,
+ REP_C_2SITE_STRICT) ||
+ IS_PREFMAS_MODE(env))
+ db_rep->rejoin_pending = TRUE;
+ if ((ret = __repmgr_defer_op(env,
+ REPMGR_REJOIN)) == 0)
+ ret = DB_REP_UNAVAIL;
+ }
} else
ret = DB_REP_UNAVAIL;
DB_ASSERT(env, ret != 0);
@@ -1332,7 +1860,6 @@ process_own_msg(env, conn)
if ((ret = __repmgr_parm_refresh_unmarshal(env,
&parms, dbt->data, dbt->size, NULL)) != 0)
return (DB_REP_UNAVAIL);
- db_rep = env->rep_handle;
DB_ASSERT(env, conn->type == REP_CONNECTION &&
IS_KNOWN_REMOTE_SITE(conn->eid));
site = SITE_FROM_EID(conn->eid);
@@ -1348,8 +1875,15 @@ process_own_msg(env, conn)
case REPMGR_GM_FORWARD:
case REPMGR_JOIN_REQUEST:
case REPMGR_JOIN_SUCCESS:
+ case REPMGR_LSNHIST_REQUEST:
+ case REPMGR_LSNHIST_RESPONSE:
+ case REPMGR_PREFMAS_FAILURE:
+ case REPMGR_PREFMAS_SUCCESS:
+ case REPMGR_READONLY_MASTER:
+ case REPMGR_READONLY_RESPONSE:
case REPMGR_REMOVE_REQUEST:
case REPMGR_RESOLVE_LIMBO:
+ case REPMGR_RESTART_CLIENT:
default:
__db_errx(env, DB_STR_A("3677",
"unexpected msg type %lu in process_own_msg", "%lu"),
@@ -1482,6 +2016,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags)
cntrl_len = __REPMGR_V3HANDSHAKE_SIZE;
break;
case 4:
+ case 5:
+ case 6:
cntrl_len = __REPMGR_HANDSHAKE_SIZE;
break;
default:
@@ -1513,6 +2049,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags)
__repmgr_v3handshake_marshal(env, &v3hs, p);
break;
case 4:
+ case 5:
+ case 6:
hs.port = my_addr->port;
hs.alignment = MEM_ALIGN;
hs.ack_policy = (u_int32_t)rep->perm_policy;
@@ -1551,11 +2089,14 @@ read_version_response(env, conn)
DB_REP *db_rep;
__repmgr_version_confirmation_args conf;
DBT vi;
+ REP *rep;
+ REPMGR_SITE *site;
char *hostname;
u_int32_t flags;
- int ret;
+ int ret, subord;
db_rep = env->rep_handle;
+ rep = db_rep->region;
if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
return (ret);
@@ -1581,14 +2122,37 @@ read_version_response(env, conn)
return (DB_REP_UNAVAIL);
}
- if ((ret = accept_handshake(env, conn, hostname)) != 0)
+ if ((ret = accept_handshake(env, conn, hostname, &subord)) != 0)
return (ret);
- flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
+ if (!IS_SUBORDINATE(db_rep))
+ flags = 0;
+ else {
+ flags = REPMGR_SUBORDINATE;
+ if (FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER) &&
+ db_rep->repmgr_status == running)
+ /*
+ * Takeover is enabled in rep-aware subordinate
+ * process.
+ */
+ flags |= REPMGR_AUTOTAKEOVER;
+ }
if ((ret = __repmgr_send_handshake(env,
conn, NULL, 0, flags)) != 0)
return (ret);
}
conn->state = CONN_READY;
+ site = SITE_FROM_EID(conn->eid);
+ /*
+ * Do not increase sites_avail redundantly for a new outgoing
+ * connection from a subordinate process.
+ */
+ if (conn->type == REP_CONNECTION &&
+ site->state == SITE_CONNECTED && !IS_SUBORDINATE(db_rep)) {
+ rep->sites_avail++;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "vers_resp: EID %lu CONNECTED, READY. sites_avail %lu",
+ (u_long)conn->eid, (u_long)rep->sites_avail));
+ }
return (ret);
}
@@ -1641,10 +2205,11 @@ __repmgr_find_version_info(env, conn, vi)
}
static int
-accept_handshake(env, conn, hostname)
+accept_handshake(env, conn, hostname, subordinate)
ENV *env;
REPMGR_CONNECTION *conn;
char *hostname;
+ int *subordinate;
{
__repmgr_handshake_args hs;
__repmgr_v2handshake_args hs2;
@@ -1653,6 +2218,7 @@ accept_handshake(env, conn, hostname)
u_int32_t ack, flags;
int electable;
+ *subordinate = 0;
switch (conn->version) {
case 2:
if (__repmgr_v2handshake_unmarshal(env, &hs2,
@@ -1674,6 +2240,8 @@ accept_handshake(env, conn, hostname)
ack = 0;
break;
case 4:
+ case 5:
+ case 6:
if (__repmgr_handshake_unmarshal(env, &hs,
conn->input.repmgr_msg.cntrl.data,
conn->input.repmgr_msg.cntrl.size, NULL) != 0)
@@ -1682,6 +2250,8 @@ accept_handshake(env, conn, hostname)
electable = F_ISSET(&hs, ELECTABLE_SITE);
flags = hs.flags;
ack = hs.ack_policy;
+ if (LF_ISSET(REPMGR_SUBORDINATE))
+ *subordinate = 1;
break;
default:
__db_errx(env, DB_STR_A("3679",
@@ -1729,13 +2299,17 @@ process_parameters(env, conn, host, port, ack, electable, flags)
u_int32_t ack, flags;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_RETRY *retry;
REPMGR_SITE *site;
+ SITEINFO *sites;
__repmgr_connect_reject_args reject;
+ __repmgr_v4connect_reject_args v4reject;
u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE];
int eid, ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
/* Connection state can be used to discern incoming versus outgoing. */
if (conn->state == CONN_CONNECTED) {
@@ -1785,6 +2359,13 @@ process_parameters(env, conn, host, port, ack, electable, flags)
TAILQ_INSERT_TAIL(&site->sub_conns,
conn, entries);
conn->eid = eid;
+ conn->auto_takeover =
+ LF_ISSET(REPMGR_AUTOTAKEOVER) ? 1 : 0;
+ SET_LISTENER_CAND(conn->auto_takeover, ++);
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "handshake from subordinate %sconnection at site %s:%u EID %u",
+ LF_ISSET(REPMGR_AUTOTAKEOVER)?
+ "takeover ": "", host, port, eid));
} else {
DB_EVENT(env,
DB_EVENT_REP_CONNECT_ESTD, &eid);
@@ -1797,6 +2378,7 @@ process_parameters(env, conn, host, port, ack, electable, flags)
TAILQ_REMOVE(&db_rep->retries,
retry, entries);
__os_free(env, retry);
+ site->ref.retry = NULL;
break;
case SITE_CONNECTED:
/*
@@ -1821,6 +2403,16 @@ process_parameters(env, conn, host, port, ack, electable, flags)
* don't have to do anything else here.
*/
break;
+ case SITE_IDLE:
+ /*
+ * This can occur after the heartbeat
+ * test hook artificially kept this
+ * site from first trying to connect.
+ */
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "handshake from idle site %s:%u EID %u",
+ host, port, eid));
+ break;
default:
DB_ASSERT(env, FALSE);
}
@@ -1834,10 +2426,18 @@ process_parameters(env, conn, host, port, ack, electable, flags)
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"rejecting connection from unknown or provisional site %s:%u",
host, port));
- reject.version = db_rep->membership_version;
- reject.gen = db_rep->member_version_gen;
- __repmgr_connect_reject_marshal(env,
- &reject, reject_buf);
+ if (conn->version < 5) {
+ v4reject.version = db_rep->membership_version;
+ v4reject.gen = db_rep->member_version_gen;
+ __repmgr_v4connect_reject_marshal(env,
+ &v4reject, reject_buf);
+ } else {
+ reject.version = db_rep->membership_version;
+ reject.gen = db_rep->member_version_gen;
+ reject.status = (site) ? site->membership : 0;
+ __repmgr_connect_reject_marshal(env,
+ &reject, reject_buf);
+ }
if ((ret = __repmgr_send_own_msg(env, conn,
REPMGR_CONNECT_REJECT, reject_buf,
@@ -1867,7 +2467,8 @@ process_parameters(env, conn, host, port, ack, electable, flags)
*/
if (!IS_SUBORDINATE(db_rep) && /* us */
!__repmgr_master_is_known(env) &&
- !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */
+ !LF_ISSET(REPMGR_SUBORDINATE) && /* the remote site */
+ !IS_PREFMAS_MODE(env)) {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"handshake with no known master to wake election thread"));
db_rep->new_connection = TRUE;
@@ -1980,6 +2581,7 @@ record_permlsn(env, conn)
*/
if (ackp->lsn.file > site->max_ack.file)
do_log_check = 1;
+ site->max_ack_gen = ackp->generation;
memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
if (do_log_check)
check_min_log_file(env);
diff --git a/src/repmgr/repmgr_stat.c b/src/repmgr/repmgr_stat.c
index fd6dabd3..215f4719 100644
--- a/src/repmgr/repmgr_stat.c
+++ b/src/repmgr/repmgr_stat.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -55,7 +55,9 @@ __repmgr_stat(env, statp, flags)
{
DB_REP *db_rep;
DB_REPMGR_STAT *copy, *stats;
- uintmax_t tmp;
+ REPMGR_SITE *site;
+ u_int32_t tmp;
+ u_int i;
int ret;
db_rep = env->rep_handle;
@@ -73,6 +75,20 @@ __repmgr_stat(env, statp, flags)
memset(stats, 0, sizeof(DB_REPMGR_STAT));
stats->st_max_elect_threads = tmp;
}
+ stats->st_incoming_queue_gbytes = db_rep->input_queue.gbytes;
+ stats->st_incoming_queue_bytes = db_rep->input_queue.bytes;
+ LOCK_MUTEX(db_rep->mutex);
+ for (i = 0; i < db_rep->site_cnt; i++) {
+ site = SITE_FROM_EID(i);
+ if (site->membership != 0) {
+ copy->st_site_total++;
+ if (FLD_ISSET(site->gmdb_flags, SITE_VIEW))
+ copy->st_site_views++;
+ else
+ copy->st_site_participants++;
+ }
+ }
+ UNLOCK_MUTEX(db_rep->mutex);
*statp = copy;
return (0);
@@ -148,6 +164,11 @@ __repmgr_print_stats(env, flags)
(u_long)sp->st_msgs_queued);
__db_dl(env, "Number of messages discarded due to queue length",
(u_long)sp->st_msgs_dropped);
+ __db_dlbytes(env, "Incoming message size in queue",
+ (u_long)sp->st_incoming_queue_gbytes, (u_long)0,
+ (u_long)sp->st_incoming_queue_bytes);
+ __db_dl(env, "Number of messages discarded due to incoming queue full",
+ (u_long)sp->st_incoming_msgs_dropped);
__db_dl(env, "Number of existing connections dropped",
(u_long)sp->st_connection_drop);
__db_dl(env, "Number of failed new connection attempts",
@@ -156,6 +177,14 @@ __repmgr_print_stats(env, flags)
(u_long)sp->st_elect_threads);
__db_dl(env, "Election threads for which space is reserved",
(u_long)sp->st_max_elect_threads);
+ __db_dl(env, "Number of participant sites in replication group",
+ (u_long)sp->st_site_participants);
+ __db_dl(env, "Total number of sites in replication group",
+ (u_long)sp->st_site_total);
+ __db_dl(env, "Number of view sites in replication group",
+ (u_long)sp->st_site_views);
+ __db_dl(env, "Number of automatic replication process takeovers",
+ (u_long)sp->st_takeovers);
__os_ufree(env, sp);
@@ -171,7 +200,7 @@ __repmgr_print_sites(env)
u_int count, i;
int ret;
- if ((ret = __repmgr_site_list(env->dbenv, &count, &list)) != 0)
+ if ((ret = __repmgr_site_list_int(env, &count, &list)) != 0)
return (ret);
if (count == 0)
@@ -189,6 +218,9 @@ __repmgr_print_sites(env)
list[i].status == DB_REPMGR_CONNECTED ? "" : "dis");
__db_msgadd(env, &mb, ", %speer",
F_ISSET(&list[i], DB_REPMGR_ISPEER) ? "" : "non-");
+ __db_msgadd(env, &mb, ", %s",
+ F_ISSET(&list[i], DB_REPMGR_ISVIEW) ?
+ "view" : "participant");
__db_msgadd(env, &mb, ")");
DB_MSGBUF_FLUSH(env, &mb);
}
@@ -238,26 +270,46 @@ __repmgr_stat_print_pp(dbenv, flags)
#endif
/*
- * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **));
+ * PUBLIC: int __repmgr_site_list_pp
+ * PUBLIC: __P((DB_ENV *, u_int *, DB_REPMGR_SITE **));
*/
int
-__repmgr_site_list(dbenv, countp, listp)
+__repmgr_site_list_pp(dbenv, countp, listp)
DB_ENV *dbenv;
u_int *countp;
DB_REPMGR_SITE **listp;
{
- DB_REP *db_rep;
- REP *rep;
- DB_REPMGR_SITE *status;
ENV *env;
DB_THREAD_INFO *ip;
+ int ret;
+
+ env = dbenv->env;
+
+ ENV_ENTER(env, ip);
+ ret = __repmgr_site_list_int(env, countp, listp);
+ ENV_LEAVE(env, ip);
+
+ return (ret);
+}
+
+/*
+ * PUBLIC: int __repmgr_site_list_int __P((ENV *, u_int *, DB_REPMGR_SITE **));
+ */
+int
+__repmgr_site_list_int(env, countp, listp)
+ ENV *env;
+ u_int *countp;
+ DB_REPMGR_SITE **listp;
+{
+ DB_REP *db_rep;
+ DB_REPMGR_SITE *status;
+ REP *rep;
REPMGR_SITE *site;
size_t array_size, total_size;
int eid, locked, ret;
u_int count, i;
char *name;
- env = dbenv->env;
db_rep = env->rep_handle;
ret = 0;
@@ -269,10 +321,8 @@ __repmgr_site_list(dbenv, countp, listp)
LOCK_MUTEX(db_rep->mutex);
locked = TRUE;
- ENV_ENTER(env, ip);
if (rep->siteinfo_seq > db_rep->siteinfo_seq)
ret = __repmgr_sync_siteaddr(env);
- ENV_LEAVE(env, ip);
if (ret != 0)
goto err;
} else {
@@ -329,6 +379,8 @@ __repmgr_site_list(dbenv, countp, listp)
if (FLD_ISSET(site->config, DB_REPMGR_PEER))
F_SET(&status[i], DB_REPMGR_ISPEER);
+ if (FLD_ISSET(site->gmdb_flags, SITE_VIEW))
+ F_SET(&status[i], DB_REPMGR_ISVIEW);
/*
* If we haven't started a communications thread, connection
diff --git a/src/repmgr/repmgr_stub.c b/src/repmgr/repmgr_stub.c
index 734c2240..999b759f 100644
--- a/src/repmgr/repmgr_stub.c
+++ b/src/repmgr/repmgr_stub.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -73,6 +73,69 @@ __repmgr_set_ack_policy(dbenv, policy)
/*
* PUBLIC: #ifndef HAVE_REPLICATION_THREADS
+ * PUBLIC: int __repmgr_get_incoming_queue_max __P((DB_ENV *, u_int32_t *,
+ * PUBLIC: u_int32_t *));
+ * PUBLIC: #endif
+ */
+int
+__repmgr_get_incoming_queue_max(dbenv, messagesp, bulk_messagesp)
+ DB_ENV *dbenv;
+ u_int32_t *messagesp;
+ u_int32_t *bulk_messagesp;
+{
+ COMPQUIET(messagesp, NULL);
+ COMPQUIET(bulk_messagesp, NULL);
+ return (__db_norepmgr(dbenv));
+}
+
+/*
+ * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
+ * PUBLIC: int __repmgr_set_incoming_queue_max __P((DB_ENV *, u_int32_t,
+ * PUBLIC: u_int32_t));
+ * PUBLIC: #endif
+ */
+int
+__repmgr_set_incoming_queue_max(dbenv, messages, bulk_messages)
+ DB_ENV *dbenv;
+ u_int32_t messages;
+ u_int32_t bulk_messages;
+{
+ COMPQUIET(messages, 0);
+ COMPQUIET(bulk_messages, 0);
+ return (__db_norepmgr(dbenv));
+}
+
+/*
+ * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
+ * PUBLIC: int __repmgr_get_incoming_queue_redzone __P((DB_ENV *,
+ * PUBLIC: u_int32_t *, u_int32_t *));
+ * PUBLIC: #endif
+ */
+int __repmgr_get_incoming_queue_redzone(dbenv, gbytesp, bytesp)
+ DB_ENV *dbenv;
+ u_int32_t *gbytesp, *bytesp;
+{
+ COMPQUIET(gbytesp, NULL);
+ COMPQUIET(bytesp, NULL);
+ return (__db_norepmgr(dbenv));
+}
+
+/*
+ * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
+ * PUBLIC: int __repmgr_get_incoming_queue_fullevent __P((DB_ENV *,
+ * PUBLIC: int *));
+ * PUBLIC: #endif
+ */
+int __repmgr_get_incoming_queue_fullevent(dbenv, onoffp)
+ DB_ENV *dbenv;
+ int *onoffp;
+{
+ COMPQUIET(onoffp, NULL);
+ return (__db_norepmgr(dbenv));
+}
+
+/*
+ * PUBLIC: #ifndef HAVE_REPLICATION_THREADS
* PUBLIC: int __repmgr_site
* PUBLIC: __P((DB_ENV *, const char *, u_int, DB_SITE **, u_int32_t));
* PUBLIC: #endif
@@ -125,11 +188,12 @@ __repmgr_local_site(dbenv, dbsitep)
/*
* PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **));
+ * PUBLIC: int __repmgr_site_list_pp
+ * PUBLIC: __P((DB_ENV *, u_int *, DB_REPMGR_SITE **));
* PUBLIC: #endif
*/
int
-__repmgr_site_list(dbenv, countp, listp)
+__repmgr_site_list_pp(dbenv, countp, listp)
DB_ENV *dbenv;
u_int *countp;
DB_REPMGR_SITE **listp;
@@ -141,11 +205,11 @@ __repmgr_site_list(dbenv, countp, listp)
/*
* PUBLIC: #ifndef HAVE_REPLICATION_THREADS
- * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t));
+ * PUBLIC: int __repmgr_start_pp __P((DB_ENV *, int, u_int32_t));
* PUBLIC: #endif
*/
int
-__repmgr_start(dbenv, nthreads, flags)
+__repmgr_start_pp(dbenv, nthreads, flags)
DB_ENV *dbenv;
int nthreads;
u_int32_t flags;
diff --git a/src/repmgr/repmgr_util.c b/src/repmgr/repmgr_util.c
index c2439436..1c5ebe59 100644
--- a/src/repmgr/repmgr_util.c
+++ b/src/repmgr/repmgr_util.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -15,9 +15,13 @@
#define INITIAL_SITES_ALLOCATION 3 /* Arbitrary guess. */
+static int convert_gmdb(ENV *, DB_THREAD_INFO *, DB *, DB_TXN *);
static int get_eid __P((ENV *, const char *, u_int, int *));
-static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *));
static int read_gmdb __P((ENV *, DB_THREAD_INFO *, u_int8_t **, size_t *));
+static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *));
+static int __repmgr_find_commit __P((ENV *, DB_LSN *, DB_LSN *, int *));
+static int __repmgr_remote_lsnhist(ENV *, int, u_int32_t,
+ __repmgr_lsnhist_match_args *);
/*
* Schedules a future attempt to re-establish a connection with the given site.
@@ -43,6 +47,8 @@ __repmgr_schedule_connection_attempt(env, eid, immediate)
REP *rep;
REPMGR_RETRY *retry, *target;
REPMGR_SITE *site;
+ SITEINFO *sites;
+ db_timeout_t timeout;
db_timespec t;
int ret;
@@ -57,7 +63,24 @@ __repmgr_schedule_connection_attempt(env, eid, immediate)
if (immediate)
TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
else {
- TIMESPEC_ADD_DB_TIMEOUT(&t, rep->connection_retry_wait);
+ /*
+ * Normally we retry a connection after connection retry
+ * timeout. In a subordinate rep-aware process, we retry sooner
+ * when there is a listener candidate on the disconnected site.
+ * The listener process will be connected from the new listener,
+ * but subordinate rep-aware process can only wait for retry.
+ * It matters when the subordinate process becomes listener and
+ * the disconnected site is master. The m_listener_wait is set
+ * to retry after enough time has passed for a takeover. The
+ * number of listener candidates is maintained in the listener
+ * process as it has connections to all subordinate processes
+ * from other sites.
+ */
+ timeout = rep->connection_retry_wait;
+ CHECK_LISTENER_CAND(timeout, >0, db_rep->m_listener_wait,
+ timeout);
+ TIMESPEC_ADD_DB_TIMEOUT(&t, timeout);
+
/*
* Insert the new "retry" on the (time-ordered) list in its
* proper position. To do so, find the list entry ("target")
@@ -284,6 +307,7 @@ __repmgr_new_site(env, sitep, host, port)
site->net_addr.host = p;
site->net_addr.port = (u_int16_t)port;
+ site->max_ack_gen = 0;
ZERO_LSN(site->max_ack);
site->ack_policy = 0;
site->alignment = 0;
@@ -295,6 +319,7 @@ __repmgr_new_site(env, sitep, host, port)
site->state = SITE_IDLE;
site->membership = 0;
+ site->gmdb_flags = 0;
site->config = 0;
*sitep = site;
@@ -535,11 +560,14 @@ __repmgr_thread_failure(env, why)
int why;
{
DB_REP *db_rep;
+ DB_THREAD_INFO *ip;
db_rep = env->rep_handle;
+ ENV_ENTER(env, ip);
LOCK_MUTEX(db_rep->mutex);
(void)__repmgr_stop_threads(env);
UNLOCK_MUTEX(db_rep->mutex);
+ ENV_LEAVE(env, ip);
return (__env_panic(env, why));
}
@@ -597,12 +625,13 @@ __repmgr_format_addr_loc(addr, buffer)
}
/*
- * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t));
+ * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t, u_int32_t));
*/
int
-__repmgr_repstart(env, flags)
+__repmgr_repstart(env, flags, startopts)
ENV *env;
u_int32_t flags;
+ u_int32_t startopts;
{
DBT my_addr;
int ret;
@@ -610,7 +639,11 @@ __repmgr_repstart(env, flags)
/* Include "cdata" in case sending to old-version site. */
if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
return (ret);
- ret = __rep_start_int(env, &my_addr, flags);
+ /*
+ * force_role_chg and hold_client_gen are used by preferred master
+ * mode to help control site startup.
+ */
+ ret = __rep_start_int(env, &my_addr, flags, startopts);
__os_free(env, my_addr.data);
if (ret != 0)
__db_err(env, ret, DB_STR("3673", "rep_start"));
@@ -618,11 +651,12 @@ __repmgr_repstart(env, flags)
}
/*
- * PUBLIC: int __repmgr_become_master __P((ENV *));
+ * PUBLIC: int __repmgr_become_master __P((ENV *, u_int32_t));
*/
int
-__repmgr_become_master(env)
+__repmgr_become_master(env, startopts)
ENV *env;
+ u_int32_t startopts;
{
DB_REP *db_rep;
DB_THREAD_INFO *ip;
@@ -631,7 +665,7 @@ __repmgr_become_master(env)
REPMGR_SITE *site;
DBT key_dbt, data_dbt;
__repmgr_membership_key_args key;
- __repmgr_membership_data_args member_status;
+ __repmgr_membership_data_args member_data;
repmgr_netaddr_t addr;
u_int32_t status;
u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
@@ -668,16 +702,23 @@ __repmgr_become_master(env)
db_rep->client_intent = FALSE;
UNLOCK_MUTEX(db_rep->mutex);
- if ((ret = __repmgr_repstart(env, DB_REP_MASTER)) != 0)
+ if ((ret = __repmgr_repstart(env, DB_REP_MASTER, startopts)) != 0)
return (ret);
+ /*
+ * Make sure member_version_gen is current so that this master
+ * can reject obsolete member lists from other sites.
+ */
+ db_rep->member_version_gen = db_rep->region->gen;
+
+ /* If there is already a gmdb, we are finished. */
if (db_rep->have_gmdb)
return (0);
- db_rep->member_version_gen = db_rep->region->gen;
- ENV_ENTER(env, ip);
+ /* There isn't a gmdb. Create one from the in-memory site list. */
if ((ret = __repmgr_hold_master_role(env, NULL)) != 0)
goto leave;
+ ENV_GET_THREAD_INFO(env, ip);
retry:
if ((ret = __repmgr_setup_gmdb_op(env, ip, &txn, DB_CREATE)) != 0)
goto err;
@@ -705,8 +746,9 @@ retry:
&key, key_buf, sizeof(key_buf), &len);
DB_ASSERT(env, ret == 0);
DB_INIT_DBT(key_dbt, key_buf, len);
- member_status.flags = status;
- __repmgr_membership_data_marshal(env, &member_status, data_buf);
+ member_data.status = status;
+ member_data.flags = site->gmdb_flags;
+ __repmgr_membership_data_marshal(env, &member_data, data_buf);
DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
if ((ret = __db_put(dbp, ip, txn, &key_dbt, &data_dbt, 0)) != 0)
goto err;
@@ -726,7 +768,6 @@ err:
if ((t_ret = __repmgr_rlse_master_role(env)) != 0 && ret == 0)
ret = t_ret;
leave:
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -840,6 +881,14 @@ __repmgr_open(env, rep_)
rep->election_retry_wait = db_rep->election_retry_wait;
rep->heartbeat_monitor_timeout = db_rep->heartbeat_monitor_timeout;
rep->heartbeat_frequency = db_rep->heartbeat_frequency;
+ rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes;
+ rep->inqueue_max_bytes = db_rep->inqueue_max_bytes;
+ if (rep->inqueue_max_gbytes == 0 && rep->inqueue_max_bytes == 0) {
+ rep->inqueue_max_bytes = DB_REPMGR_DEFAULT_INQUEUE_MAX;
+ }
+ __repmgr_set_incoming_queue_redzone(rep, rep->inqueue_max_gbytes,
+ rep->inqueue_max_bytes);
+
return (ret);
}
@@ -958,6 +1007,18 @@ __repmgr_join(env, rep_)
}
db_rep->siteinfo_seq = rep->siteinfo_seq;
+ /*
+ * Update the incoming queue limit settings if necessary.
+ */
+ if ((db_rep->inqueue_max_gbytes != 0 ||
+ db_rep->inqueue_max_bytes != 0) &&
+ (db_rep->inqueue_max_gbytes != rep->inqueue_max_gbytes ||
+ db_rep->inqueue_max_bytes != rep->inqueue_max_gbytes)) {
+ rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes;
+ rep->inqueue_max_bytes = db_rep->inqueue_max_bytes;
+ __repmgr_set_incoming_queue_redzone(rep,
+ rep->inqueue_max_gbytes, rep->inqueue_max_bytes);
+ }
unlock:
MUTEX_UNLOCK(env, rep->mtx_repmgr);
return (ret);
@@ -1073,6 +1134,7 @@ __repmgr_share_netaddrs(env, rep_, start, limit)
shared_array[eid].addr.port = db_rep->sites[i].net_addr.port;
shared_array[eid].config = db_rep->sites[i].config;
shared_array[eid].status = db_rep->sites[i].membership;
+ shared_array[eid].flags = db_rep->sites[i].gmdb_flags;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"EID %d is assigned for site %s:%lu",
eid, host, (u_long)shared_array[eid].addr.port));
@@ -1134,6 +1196,7 @@ __repmgr_copy_in_added_sites(env)
site = SITE_FROM_EID(i);
site->config = p->config;
site->membership = p->status;
+ site->gmdb_flags = p->flags;
}
out:
@@ -1266,7 +1329,9 @@ __repmgr_stable_lsn(env, stable_lsn)
db_rep = env->rep_handle;
rep = db_rep->region;
- if (rep->min_log_file != 0 && rep->min_log_file < stable_lsn->file) {
+ LOCK_MUTEX(db_rep->mutex);
+ if (rep->sites_avail != 0 && rep->min_log_file != 0 &&
+ rep->min_log_file < stable_lsn->file) {
/*
* Returning an LSN to be consistent with the rest of the
* log archiving processing. Construct LSN of format
@@ -1276,12 +1341,91 @@ __repmgr_stable_lsn(env, stable_lsn)
stable_lsn->offset = 0;
}
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
- "Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu]",
- (u_long)stable_lsn->file, (u_long)stable_lsn->offset));
+"Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu] sites_avail %lu min_log %lu",
+ (u_long)stable_lsn->file, (u_long)stable_lsn->offset,
+ (u_long)rep->sites_avail, (u_long)rep->min_log_file));
+ UNLOCK_MUTEX(db_rep->mutex);
return (0);
}
/*
+ * PUBLIC: int __repmgr_make_request_conn __P((ENV *,
+ * PUBLIC: repmgr_netaddr_t *, REPMGR_CONNECTION **));
+ */
+int
+__repmgr_make_request_conn(env, addr, connp)
+ ENV *env;
+ repmgr_netaddr_t *addr;
+ REPMGR_CONNECTION **connp;
+{
+ DBT vi;
+ __repmgr_msg_hdr_args msg_hdr;
+ __repmgr_version_confirmation_args conf;
+ REPMGR_CONNECTION *conn;
+ int alloc, ret, unused;
+
+ alloc = FALSE;
+ if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0)
+ return (ret);
+ conn->type = APP_CONNECTION;
+
+ /* Read a handshake msg, to get version confirmation and parameters. */
+ if ((ret = __repmgr_read_conn(conn)) != 0)
+ goto err;
+ /*
+ * We can only get here after having read the full 9 bytes that we
+ * expect, so this can't fail.
+ */
+ DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
+ ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
+ conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
+ DB_ASSERT(env, ret == 0);
+ __repmgr_iovec_init(&conn->iovecs);
+ conn->reading_phase = DATA_PHASE;
+
+ if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0)
+ goto err;
+ alloc = TRUE;
+
+ if ((ret = __repmgr_read_conn(conn)) != 0)
+ goto err;
+
+ /*
+ * Analyze the handshake msg, and stash relevant info.
+ */
+ if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
+ goto err;
+ DB_ASSERT(env, vi.size > 0);
+ if ((ret = __repmgr_version_confirmation_unmarshal(env,
+ &conf, vi.data, vi.size, NULL)) != 0)
+ goto err;
+
+ if (conf.version < GM_MIN_VERSION ||
+ (IS_VIEW_SITE(env) && conf.version < VIEW_MIN_VERSION) ||
+ (PREFMAS_IS_SET(env) && conf.version < PREFMAS_MIN_VERSION)) {
+ ret = DB_REP_UNAVAIL;
+ goto err;
+ }
+ conn->version = conf.version;
+
+err:
+ if (alloc) {
+ DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0);
+ __os_free(env, conn->input.repmgr_msg.cntrl.data);
+ DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0);
+ __os_free(env, conn->input.repmgr_msg.rec.data);
+ }
+ __repmgr_reset_for_reading(conn);
+ if (ret == 0)
+ *connp = conn;
+ else {
+ (void)__repmgr_close_connection(env, conn);
+ (void)__repmgr_destroy_conn(env, conn);
+ }
+ return (ret);
+}
+
+/*
* PUBLIC: int __repmgr_send_sync_msg __P((ENV *, REPMGR_CONNECTION *,
* PUBLIC: u_int32_t, u_int8_t *, u_int32_t));
*/
@@ -1311,15 +1455,511 @@ __repmgr_send_sync_msg(env, conn, type, buf, len)
}
/*
+ * Reads a whole message, when we expect to get a REPMGR_OWN_MSG.
+ */
+/*
+ * PUBLIC: int __repmgr_read_own_msg __P((ENV *, REPMGR_CONNECTION *,
+ * PUBLIC: u_int32_t *, u_int8_t **, size_t *));
+ */
+int
+__repmgr_read_own_msg(env, conn, typep, bufp, lenp)
+ ENV *env;
+ REPMGR_CONNECTION *conn;
+ u_int32_t *typep;
+ u_int8_t **bufp;
+ size_t *lenp;
+{
+ __repmgr_msg_hdr_args msg_hdr;
+ u_int8_t *buf;
+ u_int32_t type;
+ size_t size;
+ int ret;
+
+ __repmgr_reset_for_reading(conn);
+ if ((ret = __repmgr_read_conn(conn)) != 0)
+ goto err;
+ ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
+ conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
+ DB_ASSERT(env, ret == 0);
+
+ if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) {
+ ret = DB_REP_UNAVAIL; /* Protocol violation. */
+ goto err;
+ }
+ type = REPMGR_OWN_MSG_TYPE(msg_hdr);
+ if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) {
+ conn->reading_phase = DATA_PHASE;
+ __repmgr_iovec_init(&conn->iovecs);
+
+ if ((ret = __os_malloc(env, size, &buf)) != 0)
+ goto err;
+ conn->input.rep_message = NULL;
+
+ __repmgr_add_buffer(&conn->iovecs, buf, size);
+ if ((ret = __repmgr_read_conn(conn)) != 0) {
+ __os_free(env, buf);
+ goto err;
+ }
+ *bufp = buf;
+ }
+
+ *typep = type;
+ *lenp = size;
+
+err:
+ return (ret);
+}
+
+/*
+ * Returns TRUE if we are connected to the other site in a preferred
+ * master replication group, FALSE otherwise.
+ *
+ * PUBLIC: int __repmgr_prefmas_connected __P((ENV *));
+ */
+int
+__repmgr_prefmas_connected(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ REPMGR_SITE *other_site;
+
+ db_rep = env->rep_handle;
+
+ /*
+ * Preferred master mode only has 2 sites, so the other site is
+ * always EID 1.
+ */
+ if (!IS_PREFMAS_MODE(env) || !IS_KNOWN_REMOTE_SITE(1))
+ return (FALSE);
+
+ other_site = SITE_FROM_EID(1);
+ if (other_site->state == SITE_CONNECTED)
+ return (TRUE);
+
+ if ((conn = other_site->ref.conn.in) != NULL &&
+ IS_READY_STATE(conn->state))
+ return (TRUE);
+ if ((conn = other_site->ref.conn.out) != NULL &&
+ IS_READY_STATE(conn->state))
+ return (TRUE);
+
+ return (FALSE);
+}
+
+/*
+ * Used by a preferred master site to restart the remote temporary master
+ * site as a client. This is used to help guarantee that the preferred master
+ * site's transactions are never rolled back.
+ *
+ * PUBLIC: int __repmgr_restart_site_as_client __P((ENV *, int));
+ */
+int
+__repmgr_restart_site_as_client(env, eid)
+ ENV *env;
+ int eid;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ repmgr_netaddr_t addr;
+ u_int32_t type;
+ size_t len;
+ u_int8_t any_value, *response_buf;
+ int ret, t_ret;
+
+ COMPQUIET(any_value, 0);
+ db_rep = env->rep_handle;
+ conn = NULL;
+
+ if (!IS_PREFMAS_MODE(env))
+ return (0);
+
+ LOCK_MUTEX(db_rep->mutex);
+ addr = SITE_FROM_EID(eid)->net_addr;
+ UNLOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
+ return (ret);
+
+ /*
+ * No payload needed, but must send at least a dummy byte for the
+ * other side to recognize that a message has arrived.
+ */
+ if ((ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_RESTART_CLIENT, VOID_STAR_CAST &any_value, 1)) != 0)
+ goto err;
+
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &len)) != 0)
+ goto err;
+ if (type != REPMGR_PREFMAS_SUCCESS) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "restart_site_as_client got unexpected message type %d",
+ type));
+ ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */
+ }
+err:
+ if (conn != NULL) {
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ if ((t_ret = __repmgr_destroy_conn(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ }
+ return (ret);
+}
+
+/*
+ * Used by a preferred master site to make the remote temporary master
+ * site a readonly master. This is used to help preserve all temporary
+ * master transactions.
+ *
+ * PUBLIC: int __repmgr_make_site_readonly_master __P((ENV *, int,
+ * PUBLIC: u_int32_t *, DB_LSN *));
+ */
+int
+__repmgr_make_site_readonly_master(env, eid, gen, sync_lsnp)
+ ENV *env;
+ int eid;
+ u_int32_t *gen;
+ DB_LSN *sync_lsnp;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ repmgr_netaddr_t addr;
+ __repmgr_permlsn_args permlsn;
+ u_int32_t type;
+ size_t len;
+ u_int8_t any_value, *response_buf;
+ int ret, t_ret;
+
+ COMPQUIET(any_value, 0);
+ db_rep = env->rep_handle;
+ conn = NULL;
+ response_buf = NULL;
+ *gen = 0;
+ ZERO_LSN(*sync_lsnp);
+
+ if (!IS_PREFMAS_MODE(env))
+ return (0);
+
+ LOCK_MUTEX(db_rep->mutex);
+ addr = SITE_FROM_EID(eid)->net_addr;
+ UNLOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
+ return (ret);
+
+ /*
+ * No payload needed, but must send at least a dummy byte for the
+ * other side to recognize that a message has arrived.
+ */
+ if ((ret = __repmgr_send_sync_msg(env, conn,
+ REPMGR_READONLY_MASTER, VOID_STAR_CAST &any_value, 1)) != 0)
+ goto err;
+
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &len)) != 0)
+ goto err;
+
+ if (type == REPMGR_READONLY_RESPONSE) {
+ if ((ret = __repmgr_permlsn_unmarshal(env,
+ &permlsn, response_buf, len, NULL)) != 0)
+ goto err;
+ *gen = permlsn.generation;
+ *sync_lsnp = permlsn.lsn;
+ } else {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "make_site_readonly_master got unexpected message type %d",
+ type));
+ ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */
+ }
+
+err:
+ if (conn != NULL) {
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ if ((t_ret = __repmgr_destroy_conn(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ }
+ if (response_buf != NULL)
+ __os_free(env, response_buf);
+ return (ret);
+}
+
+/*
+ * Used by a preferred master site to perform the LSN history comparisons to
+ * determine whether there is are continuous or conflicting sets of
+ * transactions between this site and the remote temporary master.
+ *
+ * PUBLIC: int __repmgr_lsnhist_match __P((ENV *,
+ * PUBLIC: DB_THREAD_INFO *, int, int *));
+ */
+int
+__repmgr_lsnhist_match(env, ip, eid, match)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ int eid;
+ int *match;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ __rep_lsn_hist_data_args my_lsnhist;
+ __repmgr_lsnhist_match_args remote_lsnhist;
+ u_int32_t my_gen;
+ int found_commit, ret;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+ *match = FALSE;
+ my_gen = rep->gen;
+ found_commit = FALSE;
+
+ if (!IS_PREFMAS_MODE(env))
+ return (0);
+
+ /* Get local LSN history information for comparison. */
+ if ((ret = __rep_get_lsnhist_data(env, ip, my_gen, &my_lsnhist)) != 0)
+ return (ret);
+
+ /* Get remote LSN history information for comparison. */
+ ret = __repmgr_remote_lsnhist(env, eid, my_gen, &remote_lsnhist);
+
+ /*
+ * If the current gen doesn't exist at the remote site, the match
+ * fails.
+ *
+ * If the remote LSN or timestamp at the current gen doesn't match
+ * ours, we probably had a whack-a-mole situation where each site
+ * as up and down in isolation one or more times and the match fails.
+ *
+ * If the remote LSN for the next generation is lower than this
+ * site's startup LSN and there are any commit operations between
+ * these LSNs, there are conflicting sets of transactions and the
+ * match fails.
+ */
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match my_lsn [%lu][%lu] remote_lsn [%lu][%lu]",
+ (u_long)my_lsnhist.lsn.file, (u_long)my_lsnhist.lsn.offset,
+ (u_long)remote_lsnhist.lsn.file,
+ (u_long)remote_lsnhist.lsn.offset));
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match my_time %lu:%lu remote_time %lu:%lu",
+ (u_long)my_lsnhist.hist_sec, (u_long)my_lsnhist.hist_nsec,
+ (u_long)remote_lsnhist.hist_sec, (u_long)remote_lsnhist.hist_nsec));
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match pminit_lsn [%lu][%lu] next_gen_lsn [%lu][%lu]",
+ (u_long)db_rep->prefmas_init_lsn.file,
+ (u_long)db_rep->prefmas_init_lsn.offset,
+ (u_long)remote_lsnhist.next_gen_lsn.file,
+ (u_long)remote_lsnhist.next_gen_lsn.offset));
+ if (ret != DB_REP_UNAVAIL &&
+ LOG_COMPARE(&my_lsnhist.lsn, &remote_lsnhist.lsn) == 0 &&
+ my_lsnhist.hist_sec == remote_lsnhist.hist_sec &&
+ my_lsnhist.hist_nsec == remote_lsnhist.hist_nsec) {
+ /*
+ * If the remote site doesn't yet have the next gen or if
+ * our startup LSN is <= than the remote next gen LSN, we
+ * have a match.
+ *
+ * Otherwise, our startup LSN is higher than the remote
+ * next gen LSN. If we have any commit operations between
+ * these two LSNs, we have preferred master operations we
+ * must preserve and there is not a match. But if we just
+ * have uncommitted operations between these LSNs it doesn't
+ * matter if they are rolled back, so we call it a match and
+ * try to retain temporary master transactions if possible.
+ */
+ if (IS_ZERO_LSN(remote_lsnhist.next_gen_lsn) ||
+ LOG_COMPARE(&db_rep->prefmas_init_lsn,
+ &remote_lsnhist.next_gen_lsn) <= 0)
+ *match = TRUE;
+ else if ((ret = __repmgr_find_commit(env,
+ &remote_lsnhist.next_gen_lsn,
+ &db_rep->prefmas_init_lsn, &found_commit)) == 0 &&
+ !found_commit) {
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match !found_commit set match TRUE"));
+ *match = TRUE;
+ }
+ }
+
+ /* Don't return an error if current gen didn't exist at remote site. */
+ if (ret == DB_REP_UNAVAIL)
+ ret = 0;
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "lsnhist_match match %d returning %d", *match, ret));
+ return (ret);
+}
+
+/*
+ * Checks a range of log records from low_lsn to high_lsn for any
+ * commit operations. Sets found_commit to TRUE if a commit is
+ * found.
+ */
+static int
+__repmgr_find_commit(env, low_lsn, high_lsn, found_commit)
+ ENV *env;
+ DB_LSN *low_lsn;
+ DB_LSN *high_lsn;
+ int *found_commit;
+{
+ DB_LOGC *logc;
+ DB_LSN lsn;
+ DBT rec;
+ __txn_regop_args *txn_args;
+ u_int32_t rectype;
+ int ret, t_ret;
+
+ *found_commit = FALSE;
+ ret = 0;
+
+ lsn = *low_lsn;
+ if ((ret = __log_cursor(env, &logc)) != 0)
+ return (ret);
+ memset(&rec, 0, sizeof(rec));
+ if (__logc_get(logc, &lsn, &rec, DB_SET) == 0) {
+ do {
+ LOGCOPY_32(env, &rectype, rec.data);
+ if (rectype == DB___txn_regop) {
+ if ((ret = __txn_regop_read(
+ env, rec.data, &txn_args)) != 0)
+ goto close_cursor;
+ if (txn_args->opcode == TXN_COMMIT) {
+ *found_commit = TRUE;
+ __os_free(env, txn_args);
+ break;
+ }
+ __os_free(env, txn_args);
+ }
+ } while ((ret = __logc_get(logc, &lsn, &rec, DB_NEXT)) == 0 &&
+ LOG_COMPARE(&lsn, high_lsn) <= 0);
+ }
+close_cursor:
+ if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
+ ret = t_ret;
+ return (ret);
+}
+
+/*
+ * Used by a preferred master site to get remote LSN history information
+ * from the other site in the replication group.
+ */
+static int
+__repmgr_remote_lsnhist(env, eid, gen, lsnhist_match)
+ ENV *env;
+ int eid;
+ u_int32_t gen;
+ __repmgr_lsnhist_match_args *lsnhist_match;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ repmgr_netaddr_t addr;
+ __rep_lsn_hist_key_args lsnhist_key;
+ u_int8_t lsnhist_key_buf[__REP_LSN_HIST_KEY_SIZE];
+ u_int32_t type;
+ size_t len;
+ u_int8_t *response_buf;
+ int ret, t_ret;
+
+ db_rep = env->rep_handle;
+ conn = NULL;
+ response_buf = NULL;
+
+ if (!IS_KNOWN_REMOTE_SITE(eid))
+ return (0);
+
+ LOCK_MUTEX(db_rep->mutex);
+ addr = SITE_FROM_EID(eid)->net_addr;
+ UNLOCK_MUTEX(db_rep->mutex);
+ if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0)
+ return (ret);
+
+ /* Marshal generation for which to request remote lsnhist data. */
+ lsnhist_key.version = REP_LSN_HISTORY_FMT_VERSION;
+ lsnhist_key.gen = gen;
+ __rep_lsn_hist_key_marshal(env, &lsnhist_key, lsnhist_key_buf);
+ if ((ret = __repmgr_send_sync_msg(env, conn, REPMGR_LSNHIST_REQUEST,
+ lsnhist_key_buf, sizeof(lsnhist_key_buf))) != 0)
+ goto err;
+
+ if ((ret = __repmgr_read_own_msg(env,
+ conn, &type, &response_buf, &len)) != 0)
+ goto err;
+
+ /* Unmarshal remote lsnhist time and LSNs for comparison. */
+ if (type == REPMGR_LSNHIST_RESPONSE) {
+ if ((ret = __repmgr_lsnhist_match_unmarshal(env, lsnhist_match,
+ response_buf, __REPMGR_LSNHIST_MATCH_SIZE, NULL)) != 0)
+ goto err;
+ } else {
+ /*
+ * If the other site sent back REPMGR_PREFMAS_FAILURE, it means
+ * no lsnhist record for the requested gen was found on other
+ * site.
+ */
+ if (type != REPMGR_PREFMAS_FAILURE)
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "remote_lsnhist got unexpected message type %d",
+ type));
+ ret = DB_REP_UNAVAIL;
+ }
+
+err:
+ if (conn != NULL) {
+ if ((t_ret = __repmgr_close_connection(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ if ((t_ret = __repmgr_destroy_conn(env,
+ conn)) != 0 && ret != 0)
+ ret = t_ret;
+ }
+ if (response_buf != NULL)
+ __os_free(env, response_buf);
+ return (ret);
+}
+
+/*
+ * Returns the number of tries and the amount of time to yield the
+ * processor for preferred master waits. The total wait is the larger
+ * of 2 seconds or 3 * ack_timeout.
+ *
+ * PUBLIC: int __repmgr_prefmas_get_wait __P((ENV *, u_int32_t *, u_long *));
+ */
+int
+__repmgr_prefmas_get_wait(env, tries, yield_usecs)
+ ENV *env;
+ u_int32_t *tries;
+ u_long *yield_usecs;
+{
+ DB_REP *db_rep;
+ REP *rep;
+ db_timeout_t max_wait;
+
+ db_rep = env->rep_handle;
+ rep = db_rep->region;
+
+ *yield_usecs = 250000;
+ max_wait = DB_REPMGR_DEFAULT_ACK_TIMEOUT * 2;
+ if ((rep->ack_timeout * 3) > max_wait)
+ max_wait = rep->ack_timeout * 3;
+ *tries = max_wait / (u_int32_t)*yield_usecs;
+ return (0);
+}
+
+/*
* Produce a membership list from the known info currently in memory.
*
- * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int8_t **, size_t *));
+ * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int32_t,
+ * PUBLIC: u_int8_t **, size_t *));
*
* Caller must hold mutex.
*/
int
-__repmgr_marshal_member_list(env, bufp, lenp)
+__repmgr_marshal_member_list(env, msg_version, bufp, lenp)
ENV *env;
+ u_int32_t msg_version;
u_int8_t **bufp;
size_t *lenp;
{
@@ -1328,6 +1968,7 @@ __repmgr_marshal_member_list(env, bufp, lenp)
REPMGR_SITE *site;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
u_int8_t *buf, *p;
size_t bufsize, len;
u_int i;
@@ -1353,14 +1994,24 @@ __repmgr_marshal_member_list(env, bufp, lenp)
if (site->membership == 0)
continue;
- site_info.host.data = site->net_addr.host;
- site_info.host.size =
- (u_int32_t)strlen(site->net_addr.host) + 1;
- site_info.port = site->net_addr.port;
- site_info.flags = site->membership;
-
- ret = __repmgr_site_info_marshal(env,
- &site_info, p, (size_t)(&buf[bufsize]-p), &len);
+ if (msg_version < 5) {
+ v4site_info.host.data = site->net_addr.host;
+ v4site_info.host.size =
+ (u_int32_t)strlen(site->net_addr.host) + 1;
+ v4site_info.port = site->net_addr.port;
+ v4site_info.flags = site->membership;
+ ret = __repmgr_v4site_info_marshal(env,
+ &v4site_info, p, (size_t)(&buf[bufsize]-p), &len);
+ } else {
+ site_info.host.data = site->net_addr.host;
+ site_info.host.size =
+ (u_int32_t)strlen(site->net_addr.host) + 1;
+ site_info.port = site->net_addr.port;
+ site_info.status = site->membership;
+ site_info.flags = site->gmdb_flags;
+ ret = __repmgr_site_info_marshal(env,
+ &site_info, p, (size_t)(&buf[bufsize]-p), &len);
+ }
DB_ASSERT(env, ret == 0);
p += len;
}
@@ -1387,7 +2038,7 @@ read_gmdb(env, ip, bufp, lenp)
DBC *dbc;
DBT key_dbt, data_dbt;
__repmgr_membership_key_args key;
- __repmgr_membership_data_args member_status;
+ __repmgr_membership_data_args member_data;
__repmgr_member_metadata_args metadata;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
@@ -1435,8 +2086,13 @@ read_gmdb(env, ip, bufp, lenp)
ret = __repmgr_member_metadata_unmarshal(env,
&metadata, metadata_buf, data_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
- DB_ASSERT(env, metadata.format == REPMGR_GMDB_FMT_VERSION);
+ DB_ASSERT(env, metadata.format >= REPMGR_GMDB_FMT_MIN_VERSION &&
+ metadata.format <= REPMGR_GMDB_FMT_VERSION);
DB_ASSERT(env, metadata.version > 0);
+ /* Automatic conversion of old format gmdb if needed. */
+ if (metadata.format < REPMGR_GMDB_FMT_VERSION &&
+ (ret = convert_gmdb(env, ip, dbp, txn)) != 0)
+ goto err;
bufsize = 1000; /* Initial guess. */
if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
@@ -1459,13 +2115,14 @@ read_gmdb(env, ip, bufp, lenp)
DB_ASSERT(env, key.port > 0);
ret = __repmgr_membership_data_unmarshal(env,
- &member_status, data_buf, data_dbt.size, NULL);
+ &member_data, data_buf, data_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
- DB_ASSERT(env, member_status.flags != 0);
+ DB_ASSERT(env, member_data.status != 0);
site_info.host = key.host;
site_info.port = key.port;
- site_info.flags = member_status.flags;
+ site_info.status = member_data.status;
+ site_info.flags = member_data.flags;
if ((ret = __repmgr_site_info_marshal(env, &site_info,
p, (size_t)(&buf[bufsize]-p), &len)) == ENOMEM) {
bufsize *= 2;
@@ -1501,28 +2158,129 @@ err:
}
/*
+ * Convert an older-format group membership database into the current format.
+ */
+static int
+convert_gmdb(env, ip, dbp, txn)
+ ENV *env;
+ DB_THREAD_INFO *ip;
+ DB *dbp;
+ DB_TXN *txn;
+{
+ DBC *dbc;
+ DBT key_dbt, data_dbt, v4data_dbt;
+ __repmgr_membership_key_args key;
+ __repmgr_membership_data_args member_data;
+ __repmgr_v4membership_data_args v4member_data;
+ __repmgr_member_metadata_args metadata;
+ u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
+ u_int8_t key_buf[MAX_MSG_BUF];
+ u_int8_t metadata_buf[__REPMGR_MEMBER_METADATA_SIZE];
+ u_int8_t v4data_buf[__REPMGR_V4MEMBERSHIP_DATA_SIZE];
+ int ret, t_ret;
+
+ dbc = NULL;
+
+ if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
+ goto err;
+
+ memset(&key_dbt, 0, sizeof(key_dbt));
+ key_dbt.data = key_buf;
+ key_dbt.ulen = sizeof(key_buf);
+ F_SET(&key_dbt, DB_DBT_USERMEM);
+ memset(&data_dbt, 0, sizeof(data_dbt));
+ data_dbt.data = metadata_buf;
+ data_dbt.ulen = sizeof(metadata_buf);
+ F_SET(&data_dbt, DB_DBT_USERMEM);
+ memset(&v4data_dbt, 0, sizeof(v4data_dbt));
+ v4data_dbt.data = v4data_buf;
+ v4data_dbt.ulen = sizeof(v4data_buf);
+ F_SET(&v4data_dbt, DB_DBT_USERMEM);
+
+ /*
+ * The first gmdb record is a special metadata record that contains
+ * an empty key and gmdb metadata (format and version) and has already
+ * been validated by the caller. We need to update its format value
+ * for this conversion but leave the version alone.
+ */
+ if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) != 0)
+ goto err;
+ ret = __repmgr_membership_key_unmarshal(env,
+ &key, key_buf, key_dbt.size, NULL);
+ DB_ASSERT(env, ret == 0);
+ DB_ASSERT(env, key.host.size == 0);
+ DB_ASSERT(env, key.port == 0);
+ ret = __repmgr_member_metadata_unmarshal(env,
+ &metadata, metadata_buf, data_dbt.size, NULL);
+ DB_ASSERT(env, ret == 0);
+ DB_ASSERT(env, metadata.version > 0);
+ metadata.format = REPMGR_GMDB_FMT_VERSION;
+ __repmgr_member_metadata_marshal(env, &metadata, metadata_buf);
+ DB_INIT_DBT(data_dbt, metadata_buf, __REPMGR_MEMBER_METADATA_SIZE);
+ if ((ret = __dbc_put(dbc, &key_dbt, &data_dbt, DB_CURRENT)) != 0)
+ goto err;
+
+ /*
+ * The rest of the gmdb records contain a key (host and port) and
+ * membership data (status and now flags). But the old format was
+ * using flags for the status value, so we need to transfer the
+ * old flags value to status and provide an empty flags value for
+ * this conversion.
+ */
+ data_dbt.data = data_buf;
+ data_dbt.ulen = sizeof(data_buf);
+ while ((ret = __dbc_get(dbc, &key_dbt, &v4data_dbt, DB_NEXT)) == 0) {
+ /* Get membership data in old format. */
+ ret = __repmgr_v4membership_data_unmarshal(env,
+ &v4member_data, v4data_buf, v4data_dbt.size, NULL);
+ DB_ASSERT(env, ret == 0);
+ DB_ASSERT(env, v4member_data.flags != 0);
+
+ /* Convert membership data into current format and update. */
+ member_data.status = v4member_data.flags;
+ member_data.flags = 0;
+ __repmgr_membership_data_marshal(env, &member_data, data_buf);
+ DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
+ if ((ret = __dbc_put(dbc,
+ &key_dbt, &data_dbt, DB_CURRENT)) != 0)
+ goto err;
+ }
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+
+err:
+ if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
+ ret = t_ret;
+ return (ret);
+}
+
+/*
* Refresh our sites array from the given membership list.
*
* PUBLIC: int __repmgr_refresh_membership __P((ENV *,
- * PUBLIC: u_int8_t *, size_t));
+ * PUBLIC: u_int8_t *, size_t, u_int32_t));
*/
int
-__repmgr_refresh_membership(env, buf, len)
+__repmgr_refresh_membership(env, buf, len, version)
ENV *env;
u_int8_t *buf;
size_t len;
+ u_int32_t version;
{
DB_REP *db_rep;
+ REP *rep;
REPMGR_SITE *site;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
+ __repmgr_v4site_info_args v4site_info;
char *host;
u_int8_t *p;
u_int16_t port;
- u_int32_t i, n;
+ u_int32_t i, participants;
int eid, ret;
db_rep = env->rep_handle;
+ rep = db_rep->region;
/*
* Membership list consists of membr_vers followed by a number of
@@ -1546,9 +2304,17 @@ __repmgr_refresh_membership(env, buf, len)
for (i = 0; i < db_rep->site_cnt; i++)
F_CLR(SITE_FROM_EID(i), SITE_TOUCHED);
- for (n = 0; p < &buf[len]; ++n) {
- ret = __repmgr_site_info_unmarshal(env,
- &site_info, p, (size_t)(&buf[len] - p), &p);
+ for (participants = 0; p < &buf[len]; ) {
+ if (version < 5) {
+ ret = __repmgr_v4site_info_unmarshal(env,
+ &v4site_info, p, (size_t)(&buf[len] - p), &p);
+ site_info.host = v4site_info.host;
+ site_info.port = v4site_info.port;
+ site_info.status = v4site_info.flags;
+ site_info.flags = 0;
+ } else
+ ret = __repmgr_site_info_unmarshal(env,
+ &site_info, p, (size_t)(&buf[len] - p), &p);
DB_ASSERT(env, ret == 0);
host = site_info.host.data;
@@ -1556,9 +2322,11 @@ __repmgr_refresh_membership(env, buf, len)
(u_int8_t*)site_info.host.data + site_info.host.size <= p);
host[site_info.host.size-1] = '\0';
port = site_info.port;
+ if (!FLD_ISSET(site_info.flags, SITE_VIEW))
+ participants++;
if ((ret = __repmgr_set_membership(env,
- host, port, site_info.flags)) != 0)
+ host, port, site_info.status, site_info.flags)) != 0)
goto err;
if ((ret = __repmgr_find_site(env, host, port, &eid)) != 0)
@@ -1566,8 +2334,13 @@ __repmgr_refresh_membership(env, buf, len)
DB_ASSERT(env, IS_VALID_EID(eid));
F_SET(SITE_FROM_EID(eid), SITE_TOUCHED);
}
- ret = __rep_set_nsites_int(env, n);
+ ret = __rep_set_nsites_int(env, participants);
DB_ASSERT(env, ret == 0);
+ if (FLD_ISSET(rep->config,
+ REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT) &&
+ rep->config_nsites > 2)
+ __db_errx(env, DB_STR("3703",
+ "More than two sites in preferred master replication group"));
/* Scan "touched" flags so as to notice sites that have been removed. */
for (i = 0; i < db_rep->site_cnt; i++) {
@@ -1576,7 +2349,8 @@ __repmgr_refresh_membership(env, buf, len)
continue;
host = site->net_addr.host;
port = site->net_addr.port;
- if ((ret = __repmgr_set_membership(env, host, port, 0)) != 0)
+ if ((ret = __repmgr_set_membership(env, host, port,
+ 0, site->gmdb_flags)) != 0)
goto err;
}
@@ -1597,13 +2371,13 @@ __repmgr_reload_gmdb(env)
size_t len;
int ret;
- ENV_ENTER(env, ip);
+ ENV_GET_THREAD_INFO(env, ip);
if ((ret = read_gmdb(env, ip, &buf, &len)) == 0) {
env->rep_handle->have_gmdb = TRUE;
- ret = __repmgr_refresh_membership(env, buf, len);
+ ret = __repmgr_refresh_membership(env, buf, len,
+ DB_REPMGR_VERSION);
__os_free(env, buf);
}
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -1650,7 +2424,8 @@ __repmgr_init_save(env, dbt)
dbt->data = NULL;
dbt->size = 0;
ret = 0;
- } else if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) == 0) {
+ } else if ((ret = __repmgr_marshal_member_list(env,
+ DB_REPMGR_VERSION, &buf, &len)) == 0) {
dbt->data = buf;
dbt->size = (u_int32_t)len;
}
@@ -1700,6 +2475,7 @@ __repmgr_defer_op(env, op)
*/
if ((ret = __os_calloc(env, 1, sizeof(*msg), &msg)) != 0)
return (ret);
+ msg->size = sizeof(*msg);
msg->msg_hdr.type = REPMGR_OWN_MSG;
REPMGR_OWN_MSG_TYPE(msg->msg_hdr) = op;
ret = __repmgr_queue_put(env, msg);
@@ -1771,7 +2547,7 @@ __repmgr_become_client(env)
if ((ret = __repmgr_await_gmdbop(env)) == 0)
db_rep->client_intent = TRUE;
UNLOCK_MUTEX(db_rep->mutex);
- return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT) : ret);
+ return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT, 0) : ret);
}
/*
@@ -1897,16 +2673,17 @@ get_eid(env, host, port, eidp)
* accordingly.
*
* PUBLIC: int __repmgr_set_membership __P((ENV *,
- * PUBLIC: const char *, u_int, u_int32_t));
+ * PUBLIC: const char *, u_int, u_int32_t, u_int32_t));
*
* Caller must host db_rep mutex, and be in ENV_ENTER context.
*/
int
-__repmgr_set_membership(env, host, port, status)
+__repmgr_set_membership(env, host, port, status, flags)
ENV *env;
const char *host;
u_int port;
u_int32_t status;
+ u_int32_t flags;
{
DB_REP *db_rep;
REP *rep;
@@ -1953,7 +2730,9 @@ __repmgr_set_membership(env, host, port, status)
/* Set both private and shared copies of the info. */
site->membership = status;
+ site->gmdb_flags = flags;
sites[eid].status = status;
+ sites[eid].flags = flags;
}
MUTEX_UNLOCK(env, rep->mtx_repmgr);
@@ -1965,7 +2744,8 @@ __repmgr_set_membership(env, host, port, status)
SELECTOR_RUNNING(db_rep)) {
if (eid == db_rep->self_eid && status != SITE_PRESENT)
- ret = DB_DELETED;
+ ret = (status == SITE_ADDING) ?
+ __repmgr_defer_op(env, REPMGR_REJOIN) : DB_DELETED;
else if (orig != SITE_PRESENT && status == SITE_PRESENT &&
site->state == SITE_IDLE) {
/*
@@ -1981,10 +2761,11 @@ __repmgr_set_membership(env, host, port, status)
* failure shouldn't hurt anything, because we'll just
* naturally try again later.
*/
- ret = __repmgr_schedule_connection_attempt(env,
- eid, TRUE);
- if (eid != db_rep->self_eid)
+ if (eid != db_rep->self_eid) {
+ ret = __repmgr_schedule_connection_attempt(env,
+ eid, TRUE);
DB_EVENT(env, DB_EVENT_REP_SITE_ADDED, &eid);
+ }
} else if (orig != 0 && status == 0)
DB_EVENT(env, DB_EVENT_REP_SITE_REMOVED, &eid);
@@ -2084,3 +2865,73 @@ __repmgr_bcast_own_msg(env, type, buf, len)
}
return (0);
}
+
+/*
+ * PUBLIC: int __repmgr_bcast_member_list __P((ENV *));
+ *
+ * Broadcast membership list to all other sites in the replication group.
+ *
+ * Caller must hold mutex.
+ */
+int
+__repmgr_bcast_member_list(env)
+ ENV *env;
+{
+ DB_REP *db_rep;
+ REPMGR_CONNECTION *conn;
+ REPMGR_SITE *site;
+ u_int8_t *buf, *v4buf;
+ size_t len, v4len;
+ int ret;
+ u_int i;
+
+ db_rep = env->rep_handle;
+ if (!SELECTOR_RUNNING(db_rep))
+ return (0);
+ buf = NULL;
+ v4buf = NULL;
+ LOCK_MUTEX(db_rep->mutex);
+ /*
+ * Some of the other sites in the replication group might be at
+ * an older version, so we need to be able to send the membership
+ * list in the current or older format.
+ */
+ if ((ret = __repmgr_marshal_member_list(env,
+ DB_REPMGR_VERSION, &buf, &len)) != 0 ||
+ (ret = __repmgr_marshal_member_list(env,
+ 4, &v4buf, &v4len)) != 0) {
+ UNLOCK_MUTEX(db_rep->mutex);
+ goto out;
+ }
+ UNLOCK_MUTEX(db_rep->mutex);
+
+ RPRINT(env, (env, DB_VERB_REPMGR_MISC,
+ "Broadcast latest membership list"));
+ FOR_EACH_REMOTE_SITE_INDEX(i) {
+ site = SITE_FROM_EID(i);
+ if (site->state != SITE_CONNECTED)
+ continue;
+ if ((conn = site->ref.conn.in) != NULL &&
+ conn->state == CONN_READY &&
+ (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING,
+ (conn->version < 5 ? v4buf : buf),
+ (conn->version < 5 ? (u_int32_t) v4len : (u_int32_t)len)))
+ != 0 &&
+ (ret = __repmgr_bust_connection(env, conn)) != 0)
+ goto out;
+ if ((conn = site->ref.conn.out) != NULL &&
+ conn->state == CONN_READY &&
+ (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING,
+ (conn->version < 5 ? v4buf : buf),
+ (conn->version < 5 ? (u_int32_t)v4len : (u_int32_t)len)))
+ != 0 &&
+ (ret = __repmgr_bust_connection(env, conn)) != 0)
+ goto out;
+ }
+out:
+ if (buf != NULL)
+ __os_free(env, buf);
+ if (v4buf != NULL)
+ __os_free(env, v4buf);
+ return (ret);
+}
diff --git a/src/repmgr/repmgr_windows.c b/src/repmgr/repmgr_windows.c
index d9c2a03d..8cf05960 100644
--- a/src/repmgr/repmgr_windows.c
+++ b/src/repmgr/repmgr_windows.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -252,7 +252,7 @@ allocate_wait_slot(env, resultp, table)
* the previous wait but before reacquiring the mutex, and this
* extra signal would incorrectly cause the next wait to return
* immediately.
- */
+ */
(void)WaitForSingleObject(w->event, 0);
*resultp = i;
return (0);
@@ -639,31 +639,40 @@ __repmgr_select_loop(env)
WSAEVENT listen_event;
WSANETWORKEVENTS net_events;
struct io_info io_info;
- int i;
+ int accept_connect, i;
db_rep = env->rep_handle;
io_info.connections = connections;
io_info.events = events;
+ accept_connect = FALSE;
if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) {
__db_err(env, net_errno, DB_STR("3590",
"can't create event for listen socket"));
return (net_errno);
}
- if (!IS_SUBORDINATE(db_rep) &&
- WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) ==
- SOCKET_ERROR) {
- ret = net_errno;
- __db_err(env, ret, DB_STR("3591",
- "can't enable event for listener"));
- (void)WSACloseEvent(listen_event);
- goto out;
- }
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_first_try_connections(env)) != 0)
goto unlock;
for (;;) {
+ /*
+ * Set the event for this process to receive notification of
+ * incoming connections if this process is or has just taken
+ * over as the listener process.
+ */
+ if (!IS_SUBORDINATE(db_rep) && !accept_connect) {
+ if (WSAEventSelect(db_rep->listen_fd, listen_event,
+ FD_ACCEPT) == SOCKET_ERROR) {
+ ret = net_errno;
+ __db_err(env, ret, DB_STR("3700",
+ "can't enable event for listener"));
+ (void)WSACloseEvent(listen_event);
+ goto out;
+ }
+ accept_connect = TRUE;
+ }
+
/* Start with the two events that we always wait for. */
#define SIGNALER_INDEX 0
#define LISTENER_INDEX 1
@@ -714,6 +723,8 @@ __repmgr_select_loop(env)
ret = net_errno;
goto unlock;
}
+ if (net_events.lNetworkEvents == 0)
+ continue;
DB_ASSERT(env,
net_events.lNetworkEvents & FD_ACCEPT);
if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT])
@@ -815,7 +826,16 @@ handle_completion(env, conn)
/* Check both writing and reading. */
if (events.lNetworkEvents & FD_CLOSE) {
error = events.iErrorCode[FD_CLOSE_BIT];
- goto report;
+
+ /*
+ * There could be data for reading when we see FD_CLOSE,
+ * so we should try reading in this case.
+ */
+ if (error != 0)
+ goto report;
+ else if ((ret =
+ __repmgr_read_from_site(env, conn)) != 0)
+ goto err;
}
if (events.lNetworkEvents & FD_WRITE) {
@@ -823,7 +843,7 @@ handle_completion(env, conn)
error = events.iErrorCode[FD_WRITE_BIT];
goto report;
} else if ((ret =
- __repmgr_write_some(env, conn)) != 0)
+ __repmgr_write_some(env, conn)) != 0)
goto err;
}
@@ -832,7 +852,7 @@ handle_completion(env, conn)
error = events.iErrorCode[FD_READ_BIT];
goto report;
} else if ((ret =
- __repmgr_read_from_site(env, conn)) != 0)
+ __repmgr_read_from_site(env, conn)) != 0)
goto err;
}