diff options
Diffstat (limited to 'src/repmgr')
| -rw-r--r-- | src/repmgr/repmgr.msg | 44 | ||||
| -rw-r--r-- | src/repmgr/repmgr.src | 2 | ||||
| -rw-r--r-- | src/repmgr/repmgr_automsg.c | 206 | ||||
| -rw-r--r-- | src/repmgr/repmgr_elect.c | 214 | ||||
| -rw-r--r-- | src/repmgr/repmgr_method.c | 954 | ||||
| -rw-r--r-- | src/repmgr/repmgr_msg.c | 655 | ||||
| -rw-r--r-- | src/repmgr/repmgr_net.c | 186 | ||||
| -rw-r--r-- | src/repmgr/repmgr_posix.c | 2 | ||||
| -rw-r--r-- | src/repmgr/repmgr_queue.c | 132 | ||||
| -rw-r--r-- | src/repmgr/repmgr_rec.c | 10 | ||||
| -rw-r--r-- | src/repmgr/repmgr_sel.c | 726 | ||||
| -rw-r--r-- | src/repmgr/repmgr_stat.c | 74 | ||||
| -rw-r--r-- | src/repmgr/repmgr_stub.c | 74 | ||||
| -rw-r--r-- | src/repmgr/repmgr_util.c | 957 | ||||
| -rw-r--r-- | src/repmgr/repmgr_windows.c | 50 |
15 files changed, 3733 insertions, 553 deletions
diff --git a/src/repmgr/repmgr.msg b/src/repmgr/repmgr.msg index 020f2e9c..ba544936 100644 --- a/src/repmgr/repmgr.msg +++ b/src/repmgr/repmgr.msg @@ -65,6 +65,11 @@ ARG port u_int16_t END BEGIN_MSG membership_data +ARG status u_int32_t +ARG flags u_int32_t +END + +BEGIN_MSG v4membership_data ARG flags u_int32_t END @@ -98,22 +103,51 @@ BEGIN_MSG membr_vers ARG version u_int32_t ARG gen u_int32_t END + BEGIN_MSG site_info check_length ARG host DBT ARG port u_int16_t +ARG status u_int32_t +ARG flags u_int32_t +END + +BEGIN_MSG v4site_info check_length +ARG host DBT +ARG port u_int16_t ARG flags u_int32_t END /* * If site A breaks or rejects a connection from site B, it first * tries to send B this message containing site A's currently known - * membership DB version. Site B can use this to decide what to do. - * If site B knows of a later version, it should retry the connection - * to site A later, polling at it until site A catches up. However, if - * site B's known version is less, it means that site B is no longer in - * the group, and so instead it should shut down and notify the application. + * membership DB version and site B's status in site A's membership DB. + * Site B can use them to decide what to do. If site B knows of a later + * version, it should retry the connection to site A later, polling + * until site A catches up. However, if site B's known version is + * less and site B's status is adding in site A's membership DB, it + * means that a badly-timed change of master may have caused the current + * master to lose B's membership DB update to present, so it should + * retry the connection to site A later, otherwise, site B is no longer + * in the group and it should shut down and notify the application. */ BEGIN_MSG connect_reject ARG version u_int32_t ARG gen u_int32_t +ARG status u_int32_t +END + +BEGIN_MSG v4connect_reject +ARG version u_int32_t +ARG gen u_int32_t +END + +/* + * For preferred master LSN history comparison between the sites. + * The next_gen_lsn is [0,0] if the next generation doesn't yet exist. + */ +BEGIN_MSG lsnhist_match +ARG lsn DB_LSN +ARG hist_sec u_int32_t +ARG hist_nsec u_int32_t +ARG next_gen_lsn DB_LSN END diff --git a/src/repmgr/repmgr.src b/src/repmgr/repmgr.src index 68d8c239..f42e159f 100644 --- a/src/repmgr/repmgr.src +++ b/src/repmgr/repmgr.src @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2010, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2015 Oracle and/or its affiliates. All rights reserved. */ DBPRIVATE diff --git a/src/repmgr/repmgr_automsg.c b/src/repmgr/repmgr_automsg.c index 90af08ff..31bc4c35 100644 --- a/src/repmgr/repmgr_automsg.c +++ b/src/repmgr/repmgr_automsg.c @@ -463,6 +463,7 @@ __repmgr_membership_data_marshal(env, argp, bp) __repmgr_membership_data_args *argp; u_int8_t *bp; { + DB_HTONL_COPYOUT(env, bp, argp->status); DB_HTONL_COPYOUT(env, bp, argp->flags); } @@ -481,6 +482,7 @@ __repmgr_membership_data_unmarshal(env, argp, bp, max, nextp) { if (max < __REPMGR_MEMBERSHIP_DATA_SIZE) goto too_few; + DB_NTOHL_COPYIN(env, argp->status, bp); DB_NTOHL_COPYIN(env, argp->flags, bp); if (nextp != NULL) @@ -494,6 +496,46 @@ too_few: } /* + * PUBLIC: void __repmgr_v4membership_data_marshal __P((ENV *, + * PUBLIC: __repmgr_v4membership_data_args *, u_int8_t *)); + */ +void +__repmgr_v4membership_data_marshal(env, argp, bp) + ENV *env; + __repmgr_v4membership_data_args *argp; + u_int8_t *bp; +{ + DB_HTONL_COPYOUT(env, bp, argp->flags); +} + +/* + * PUBLIC: int __repmgr_v4membership_data_unmarshal __P((ENV *, + * PUBLIC: __repmgr_v4membership_data_args *, u_int8_t *, size_t, + * PUBLIC: u_int8_t **)); + */ +int +__repmgr_v4membership_data_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __repmgr_v4membership_data_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + if (max < __REPMGR_V4MEMBERSHIP_DATA_SIZE) + goto too_few; + DB_NTOHL_COPYIN(env, argp->flags, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __repmgr_v4membership_data message")); + return (EINVAL); +} + +/* * PUBLIC: void __repmgr_member_metadata_marshal __P((ENV *, * PUBLIC: __repmgr_member_metadata_args *, u_int8_t *)); */ @@ -669,6 +711,7 @@ __repmgr_site_info_marshal(env, argp, bp, max, lenp) bp += argp->host.size; } DB_HTONS_COPYOUT(env, bp, argp->port); + DB_HTONL_COPYOUT(env, bp, argp->status); DB_HTONL_COPYOUT(env, bp, argp->flags); *lenp = (size_t)(bp - start); @@ -702,6 +745,7 @@ __repmgr_site_info_unmarshal(env, argp, bp, max, nextp) goto too_few; bp += argp->host.size; DB_NTOHS_COPYIN(env, argp->port, bp); + DB_NTOHL_COPYIN(env, argp->status, bp); DB_NTOHL_COPYIN(env, argp->flags, bp); if (nextp != NULL) @@ -715,6 +759,75 @@ too_few: } /* + * PUBLIC: int __repmgr_v4site_info_marshal __P((ENV *, + * PUBLIC: __repmgr_v4site_info_args *, u_int8_t *, size_t, size_t *)); + */ +int +__repmgr_v4site_info_marshal(env, argp, bp, max, lenp) + ENV *env; + __repmgr_v4site_info_args *argp; + u_int8_t *bp; + size_t *lenp, max; +{ + u_int8_t *start; + + if (max < __REPMGR_V4SITE_INFO_SIZE + + (size_t)argp->host.size) + return (ENOMEM); + start = bp; + + DB_HTONL_COPYOUT(env, bp, argp->host.size); + if (argp->host.size > 0) { + memcpy(bp, argp->host.data, argp->host.size); + bp += argp->host.size; + } + DB_HTONS_COPYOUT(env, bp, argp->port); + DB_HTONL_COPYOUT(env, bp, argp->flags); + + *lenp = (size_t)(bp - start); + return (0); +} + +/* + * PUBLIC: int __repmgr_v4site_info_unmarshal __P((ENV *, + * PUBLIC: __repmgr_v4site_info_args *, u_int8_t *, size_t, u_int8_t **)); + */ +int +__repmgr_v4site_info_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __repmgr_v4site_info_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + size_t needed; + + needed = __REPMGR_V4SITE_INFO_SIZE; + if (max < needed) + goto too_few; + DB_NTOHL_COPYIN(env, argp->host.size, bp); + if (argp->host.size == 0) + argp->host.data = NULL; + else + argp->host.data = bp; + needed += (size_t)argp->host.size; + if (max < needed) + goto too_few; + bp += argp->host.size; + DB_NTOHS_COPYIN(env, argp->port, bp); + DB_NTOHL_COPYIN(env, argp->flags, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __repmgr_v4site_info message")); + return (EINVAL); +} + +/* * PUBLIC: void __repmgr_connect_reject_marshal __P((ENV *, * PUBLIC: __repmgr_connect_reject_args *, u_int8_t *)); */ @@ -726,6 +839,7 @@ __repmgr_connect_reject_marshal(env, argp, bp) { DB_HTONL_COPYOUT(env, bp, argp->version); DB_HTONL_COPYOUT(env, bp, argp->gen); + DB_HTONL_COPYOUT(env, bp, argp->status); } /* @@ -744,6 +858,7 @@ __repmgr_connect_reject_unmarshal(env, argp, bp, max, nextp) goto too_few; DB_NTOHL_COPYIN(env, argp->version, bp); DB_NTOHL_COPYIN(env, argp->gen, bp); + DB_NTOHL_COPYIN(env, argp->status, bp); if (nextp != NULL) *nextp = bp; @@ -755,3 +870,94 @@ too_few: return (EINVAL); } +/* + * PUBLIC: void __repmgr_v4connect_reject_marshal __P((ENV *, + * PUBLIC: __repmgr_v4connect_reject_args *, u_int8_t *)); + */ +void +__repmgr_v4connect_reject_marshal(env, argp, bp) + ENV *env; + __repmgr_v4connect_reject_args *argp; + u_int8_t *bp; +{ + DB_HTONL_COPYOUT(env, bp, argp->version); + DB_HTONL_COPYOUT(env, bp, argp->gen); +} + +/* + * PUBLIC: int __repmgr_v4connect_reject_unmarshal __P((ENV *, + * PUBLIC: __repmgr_v4connect_reject_args *, u_int8_t *, size_t, + * PUBLIC: u_int8_t **)); + */ +int +__repmgr_v4connect_reject_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __repmgr_v4connect_reject_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + if (max < __REPMGR_V4CONNECT_REJECT_SIZE) + goto too_few; + DB_NTOHL_COPYIN(env, argp->version, bp); + DB_NTOHL_COPYIN(env, argp->gen, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __repmgr_v4connect_reject message")); + return (EINVAL); +} + +/* + * PUBLIC: void __repmgr_lsnhist_match_marshal __P((ENV *, + * PUBLIC: __repmgr_lsnhist_match_args *, u_int8_t *)); + */ +void +__repmgr_lsnhist_match_marshal(env, argp, bp) + ENV *env; + __repmgr_lsnhist_match_args *argp; + u_int8_t *bp; +{ + DB_HTONL_COPYOUT(env, bp, argp->lsn.file); + DB_HTONL_COPYOUT(env, bp, argp->lsn.offset); + DB_HTONL_COPYOUT(env, bp, argp->hist_sec); + DB_HTONL_COPYOUT(env, bp, argp->hist_nsec); + DB_HTONL_COPYOUT(env, bp, argp->next_gen_lsn.file); + DB_HTONL_COPYOUT(env, bp, argp->next_gen_lsn.offset); +} + +/* + * PUBLIC: int __repmgr_lsnhist_match_unmarshal __P((ENV *, + * PUBLIC: __repmgr_lsnhist_match_args *, u_int8_t *, size_t, u_int8_t **)); + */ +int +__repmgr_lsnhist_match_unmarshal(env, argp, bp, max, nextp) + ENV *env; + __repmgr_lsnhist_match_args *argp; + u_int8_t *bp; + size_t max; + u_int8_t **nextp; +{ + if (max < __REPMGR_LSNHIST_MATCH_SIZE) + goto too_few; + DB_NTOHL_COPYIN(env, argp->lsn.file, bp); + DB_NTOHL_COPYIN(env, argp->lsn.offset, bp); + DB_NTOHL_COPYIN(env, argp->hist_sec, bp); + DB_NTOHL_COPYIN(env, argp->hist_nsec, bp); + DB_NTOHL_COPYIN(env, argp->next_gen_lsn.file, bp); + DB_NTOHL_COPYIN(env, argp->next_gen_lsn.offset, bp); + + if (nextp != NULL) + *nextp = bp; + return (0); + +too_few: + __db_errx(env, DB_STR("3675", + "Not enough input bytes to fill a __repmgr_lsnhist_match message")); + return (EINVAL); +} + diff --git a/src/repmgr/repmgr_elect.c b/src/repmgr/repmgr_elect.c index 3a84694a..15a2de7b 100644 --- a/src/repmgr/repmgr_elect.c +++ b/src/repmgr/repmgr_elect.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -12,9 +12,9 @@ static db_timeout_t __repmgr_compute_response_time __P((ENV *)); static int __repmgr_elect __P((ENV *, u_int32_t, db_timespec *)); -static int __repmgr_elect_main __P((ENV *, REPMGR_RUNNABLE *)); +static int __repmgr_elect_main __P((ENV *, + DB_THREAD_INFO *, REPMGR_RUNNABLE *)); static void *__repmgr_elect_thread __P((void *)); -static int send_membership __P((ENV *)); /* * Starts an election thread. @@ -90,26 +90,39 @@ __repmgr_elect_thread(argsp) { REPMGR_RUNNABLE *th; ENV *env; + DB_THREAD_INFO *ip; int ret; th = argsp; env = th->env; + ip = NULL; + ret = 0; - RPRINT(env, (env, DB_VERB_REPMGR_MISC, "starting election thread")); + ENV_ENTER_RET(env, ip, ret); + if (ret == 0) + RPRINT(env, (env, + DB_VERB_REPMGR_MISC, "starting election thread")); - if ((ret = __repmgr_elect_main(env, th)) != 0) { + if (ret != 0 || (ret = __repmgr_elect_main(env, ip, th)) != 0) { __db_err(env, ret, "election thread failed"); + RPRINT(env, (env, + DB_VERB_REPMGR_MISC, "election thread is exiting")); + ENV_LEAVE(env, ip); (void)__repmgr_thread_failure(env, ret); } - - RPRINT(env, (env, DB_VERB_REPMGR_MISC, "election thread is exiting")); + if (ret == 0) { + RPRINT(env, (env, + DB_VERB_REPMGR_MISC, "election thread is exiting")); + ENV_LEAVE(env, ip); + } th->finished = TRUE; return (NULL); } static int -__repmgr_elect_main(env, th) +__repmgr_elect_main(env, ip, th) ENV *env; + DB_THREAD_INFO *ip; REPMGR_RUNNABLE *th; { DB_REP *db_rep; @@ -123,10 +136,13 @@ __repmgr_elect_main(env, th) db_timespec failtime, now, repstart_time, target, wait_til; db_timeout_t delay_time, response_time, tmp_time; u_long sec, usec; - u_int32_t flags; - int done_repstart, ret, suppress_election; + u_int32_t flags, max_tries, tries; + int client_detected, done_repstart, lsnhist_match, master_detected; + int ret, suppress_election; enum { ELECTION, REPSTART } action; + COMPQUIET(usec, 0); + COMPQUIET(max_tries, 0); COMPQUIET(action, ELECTION); db_rep = env->rep_handle; @@ -181,6 +197,120 @@ __repmgr_elect_main(env, th) UNLOCK_MUTEX(db_rep->mutex); /* + * In preferred master mode, the select thread signals when a + * client has lost its connection to the master via prefmas_pending, + * but the actual restart as temporary master is done here in an + * election thread. + */ + if (IS_PREFMAS_MODE(env) && F_ISSET(rep, REP_F_CLIENT) && + db_rep->prefmas_pending == start_temp_master) { + db_rep->prefmas_pending = no_action; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "elect_main preferred master restart temp master")); + ret = __repmgr_become_master(env, 0); + goto out; + } + + /* Get preferred master wait limits for detecting the other site. */ + if (IS_PREFMAS_MODE(env) && + (ret = __repmgr_prefmas_get_wait(env, &max_tries, &usec)) != 0) + goto out; + + /* Preferred master mode master site start-up. */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER) && + LF_ISSET(ELECT_F_STARTUP)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "elect_main preferred master site startup")); + client_detected = FALSE; + lsnhist_match = FALSE; + tries = 0; + while (!client_detected && tries < max_tries) { + __os_yield(env, 0, usec); + tries++; + client_detected = __repmgr_prefmas_connected(env); + } + if (client_detected) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "elect_main preferred master client detected")); + /* + * Restart remote site as a client. Depending on the + * outcome of lsnhist_match below, this site will + * either restart as master or it will start an + * election. In either case, the remote site should + * be running as a client. + * + * Then perform the lsnhist_match comparison. + */ + if ((ret = __repmgr_restart_site_as_client( + env, 1)) != 0 || + (ret = __repmgr_lsnhist_match(env, + ip, 1, &lsnhist_match)) != 0) + goto out; + /* + * An lsnhist_match means that we have a continuous + * set of transactions and it is safe to call a + * comparison election to preserve any temporary master + * transactions that were committed while this site + * was down. + */ + if (lsnhist_match) { + F_CLR(rep, REP_F_HOLD_GEN); + LF_SET(ELECT_F_IMMED); + LF_CLR(ELECT_F_STARTUP); + /* Continue on to election code below. */ + } + } + /* + * If we didn't detect a client within a reasonable time or + * we failed the lsnhist_match (meaning we have conflicting + * sets of transactions), we start this site as a master and + * possibly force rollback of temporary master transactions. + */ + if (!client_detected || !lsnhist_match) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "elect_main preferred master site start master")); + ret = __repmgr_become_master(env, 0); + F_CLR(rep, REP_F_HOLD_GEN); + goto out; + } + } + + /* Preferred master mode client site start-up. */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) && + LF_ISSET(ELECT_F_STARTUP)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "elect_main preferred master client site startup")); + master_detected = FALSE; + tries = 0; + while (!master_detected && tries < max_tries) { + __os_yield(env, 0, usec); + tries++; + master_detected = __repmgr_prefmas_connected(env); + } + /* + * If we find the master, restart as client here so that we + * send a newclient message after we are connected to the + * master. The master will send a newmaster message so that + * we can start the client sync process. + * + * If we haven't found the master after the timeout, start as + * temporary master. + */ + if (master_detected) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "elect_main preferred master detected")); + ret = __repmgr_become_client(env); + } else { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "elect_main preferred master client start master")); + ret = __repmgr_become_master(env, 0); + } + goto out; + } + + /* * The 'done_repstart' flag keeps track of which was our most recent * operation (repstart or election), so that we can alternate * appropriately. There are a few different ways this thread can be @@ -188,7 +318,7 @@ __repmgr_elect_main(env, th) * called. The one exception is at initial start-up, where we * first probe for a master by sending out rep_start(CLIENT) calls. */ - if (LF_ISSET(ELECT_F_IMMED)) { + if (LF_ISSET(ELECT_F_IMMED) && !IS_VIEW_SITE(env)) { /* * When the election succeeds, we've successfully completed * everything we need to do. If it fails in an unexpected way, @@ -256,11 +386,13 @@ __repmgr_elect_main(env, th) /* * See if it's time to retry the operation. Normally it's an * election we're interested in retrying. But we refrain from - * calling for elections if so configured. + * calling for elections if so configured or we are a view. */ - suppress_election = LF_ISSET(ELECT_F_STARTUP) ? + suppress_election = IS_VIEW_SITE(env) || + (LF_ISSET(ELECT_F_STARTUP) ? db_rep->init_policy == DB_REP_CLIENT : - !FLD_ISSET(rep->config, REP_C_ELECTIONS); + !FLD_ISSET(rep->config, REP_C_ELECTIONS)) || + LF_ISSET(ELECT_F_CLIENT_RESTART); repstart_time = db_rep->repstart_time; target = suppress_election ? repstart_time : failtime; TIMESPEC_ADD_DB_TIMEOUT(&target, rep->election_retry_wait); @@ -343,7 +475,8 @@ __repmgr_elect_main(env, th) DB_ASSERT(env, action == REPSTART); db_rep->new_connection = FALSE; - if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0) + if ((ret = __repmgr_repstart(env, + DB_REP_CLIENT, 0)) != 0) goto out; done_repstart = TRUE; @@ -476,7 +609,20 @@ __repmgr_elect(env, flags, failtimep) case DB_REP_UNAVAIL: __os_gettime(env, failtimep, 1); DB_EVENT(env, DB_EVENT_REP_ELECTION_FAILED, NULL); - if ((t_ret = send_membership(env)) != 0) + /* + * If an election fails with DB_REP_UNAVAIL, it could be + * because a participating site has an obsolete, too-high + * notion of the group size. (This could happen if the site + * was down/disconnected during removal of some (other) sites.) + * To remedy this, broadcast a current copy of the membership + * list. Since all sites are doing this, and we always ratchet + * to the most up-to-date version, this should bring all sites + * up to date. We only do this after a failure, during what + * will normally be an idle period anyway, so that we don't + * slow down a first election following the loss of an active + * master. + */ + if ((t_ret = __repmgr_bcast_member_list(env)) != 0) ret = t_ret; break; @@ -498,40 +644,6 @@ __repmgr_elect(env, flags, failtimep) } /* - * If an election fails with DB_REP_UNAVAIL, it could be because a participating - * site has an obsolete, too-high notion of the group size. (This could happen - * if the site was down/disconnected during removal of some (other) sites.) To - * remedy this, broadcast a current copy of the membership list. Since all - * sites are doing this, and we always ratchet to the most up-to-date version, - * this should bring all sites up to date. We only do this after a failure, - * during what will normally be an idle period anyway, so that we don't slow - * down a first election following the loss of an active master. - */ -static int -send_membership(env) - ENV *env; -{ - DB_REP *db_rep; - u_int8_t *buf; - size_t len; - int ret; - - db_rep = env->rep_handle; - buf = NULL; - LOCK_MUTEX(db_rep->mutex); - if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) != 0) - goto out; - RPRINT(env, (env, DB_VERB_REPMGR_MISC, - "Broadcast latest membership list")); - ret = __repmgr_bcast_own_msg(env, REPMGR_SHARING, buf, len); -out: - UNLOCK_MUTEX(db_rep->mutex); - if (buf != NULL) - __os_free(env, buf); - return (ret); -} - -/* * Becomes master after we've won an election, if we can. * * PUBLIC: int __repmgr_claim_victory __P((ENV *)); @@ -543,7 +655,7 @@ __repmgr_claim_victory(env) int ret; env->rep_handle->takeover_pending = FALSE; - if ((ret = __repmgr_become_master(env)) == DB_REP_UNAVAIL) { + if ((ret = __repmgr_become_master(env, 0)) == DB_REP_UNAVAIL) { ret = 0; RPRINT(env, (env, DB_VERB_REPMGR_MISC, "Won election but lost race with DUPMASTER client intent")); diff --git a/src/repmgr/repmgr_method.c b/src/repmgr/repmgr_method.c index 229cf650..729ba5ff 100644 --- a/src/repmgr/repmgr_method.c +++ b/src/repmgr/repmgr_method.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -29,19 +29,17 @@ static int get_channel_connection __P((CHANNEL *, REPMGR_CONNECTION **)); static int init_dbsite __P((ENV *, int, const char *, u_int, DB_SITE **)); static int join_group_at_site __P((ENV *, repmgr_netaddr_t *)); static int kick_blockers __P((ENV *, REPMGR_CONNECTION *, void *)); -static int make_request_conn __P((ENV *, - repmgr_netaddr_t *, REPMGR_CONNECTION **)); static int set_local_site __P((DB_SITE *, u_int32_t)); -static int read_own_msg __P((ENV *, - REPMGR_CONNECTION *, u_int32_t *, u_int8_t **, size_t *)); static int refresh_site __P((DB_SITE *)); static int __repmgr_await_threads __P((ENV *)); static int __repmgr_build_data_out __P((ENV *, DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp)); static int __repmgr_build_msg_out __P((ENV *, DBT *, u_int32_t, __repmgr_msg_metadata_args *, REPMGR_IOVECS **iovecsp)); +static int __repmgr_demote_site(ENV *, int); static int repmgr_only __P((ENV *, const char *)); static int __repmgr_restart __P((ENV *, int, u_int32_t)); +static int __repmgr_remove_and_close_site __P((DB_SITE *)); static int __repmgr_remove_site __P((DB_SITE *)); static int __repmgr_remove_site_pp __P((DB_SITE *)); static int __repmgr_start_msg_threads __P((ENV *, u_int)); @@ -52,25 +50,21 @@ static int send_msg_self __P((ENV *, REPMGR_IOVECS *, u_int32_t)); static int site_by_addr __P((ENV *, const char *, u_int, DB_SITE **)); /* - * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); + * PUBLIC: int __repmgr_start_pp __P((DB_ENV *, int, u_int32_t)); */ int -__repmgr_start(dbenv, nthreads, flags) +__repmgr_start_pp(dbenv, nthreads, flags) DB_ENV *dbenv; int nthreads; u_int32_t flags; { DB_REP *db_rep; - REP *rep; - REPMGR_SITE *me, *site; - DB_THREAD_INFO *ip; ENV *env; - int first, is_listener, locked, min, need_masterseek, ret, start_master; - u_int i, n; + DB_THREAD_INFO *ip; + int ret; env = dbenv->env; db_rep = env->rep_handle; - rep = db_rep->region; switch (flags) { case 0: @@ -102,7 +96,27 @@ __repmgr_start(dbenv, nthreads, flags) return (EINVAL); } - /* Check if it is a shut-down site, if so, clean the resources. */ + /* A view site cannot be started as MASTER or ELECTION. */ + if (IS_VIEW_SITE(env) && + (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) { + __db_errx(env, DB_STR("3694", + "A view site must be started with DB_REP_CLIENT")); + return (EINVAL); + } + + /* Must start site as client in preferred master mode. */ + if (PREFMAS_IS_SET(env) && + (flags == DB_REP_MASTER || flags == DB_REP_ELECTION)) { + __db_errx(env, DB_STR("3702", + "A preferred master site must be started with " + "DB_REP_CLIENT")); + return (EINVAL); + } + + /* + * Check if it is a shut-down site, if so, clean the resources and + * reset the status in order to get ready to start replication. + */ if (db_rep->repmgr_status == stopped) { if ((ret = __repmgr_stop(env)) != 0) { __db_errx(env, DB_STR("3638", @@ -112,7 +126,55 @@ __repmgr_start(dbenv, nthreads, flags) db_rep->repmgr_status = ready; } + /* Record the original configurations given by application. */ + ENV_ENTER(env, ip); db_rep->init_policy = flags; + db_rep->config_nthreads = nthreads; + ret = __repmgr_start_int(env, nthreads, flags); + ENV_LEAVE(env, ip); + return (ret); +} + +/* + * Internal processing to start replication manager. + * + * PUBLIC: int __repmgr_start_int __P((ENV *, int, u_int32_t)); + */ +int +__repmgr_start_int(env, nthreads, flags) + ENV *env; + int nthreads; + u_int32_t flags; +{ + DB_LOG *dblp; + DB_REP *db_rep; + LOG *lp; + REP *rep; + REPMGR_SITE *me, *site; + u_int32_t startopts; + int first, flags_error, is_listener, locked, min; + int need_masterseek, ret, start_master; + u_int i, n; + + db_rep = env->rep_handle; + rep = db_rep->region; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + flags_error = 0; + startopts = 0; + + /* + * For preferred master master site startup, we need to save the + * log location at the end of our previous transactions for + * the lsnhist_match comparisons. Starting repmgr adds a few + * more log records that we don't want to count in lsnhist_match. + */ + if (FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) { + LOG_SYSTEM_LOCK(env); + db_rep->prefmas_init_lsn = lp->lsn; + LOG_SYSTEM_UNLOCK(env); + } + if ((ret = __rep_set_transport_int(env, db_rep->self_eid, __repmgr_send)) != 0) return (ret); @@ -128,7 +190,8 @@ __repmgr_start(dbenv, nthreads, flags) if (db_rep->restored_list != NULL) { ret = __repmgr_refresh_membership(env, - db_rep->restored_list, db_rep->restored_list_length); + db_rep->restored_list, db_rep->restored_list_length, + DB_REPMGR_VERSION); __os_free(env, db_rep->restored_list); db_rep->restored_list = NULL; } else { @@ -145,9 +208,15 @@ __repmgr_start(dbenv, nthreads, flags) * join. */ ret = __repmgr_join_group(env); + else if (VIEW_TO_PARTICIPANT(db_rep, me)) { + __db_errx(env, DB_STR("3695", + "A view site must be started with a view callback")); + return (EINVAL); + } } else if (ret == ENOENT) { - ENV_ENTER(env, ip); - if (FLD_ISSET(me->config, DB_GROUP_CREATOR)) + if (FLD_ISSET(me->config, DB_GROUP_CREATOR) || + (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER))) start_master = TRUE; /* * LEGACY is inconsistent with CREATOR, but start_master @@ -166,10 +235,12 @@ __repmgr_start(dbenv, nthreads, flags) continue; if ((ret = __repmgr_set_membership(env, site->net_addr.host, - site->net_addr.port, - SITE_PRESENT)) != 0) + site->net_addr.port, SITE_PRESENT, + site->gmdb_flags)) != 0) break; - n++; + if (!FLD_ISSET(site->gmdb_flags, + SITE_VIEW)) + n++; } ret = __rep_set_nsites_int(env, n); DB_ASSERT(env, ret == 0); @@ -180,30 +251,27 @@ __repmgr_start(dbenv, nthreads, flags) db_rep->member_version_gen = 1; if ((ret = __repmgr_set_membership(env, me->net_addr.host, me->net_addr.port, - SITE_PRESENT)) == 0) { + SITE_PRESENT, 0)) == 0) { ret = __rep_set_nsites_int(env, 1); DB_ASSERT(env, ret == 0); } UNLOCK_MUTEX(db_rep->mutex); } else ret = __repmgr_join_group(env); - ENV_LEAVE(env, ip); } else if (ret == DB_DELETED) ret = DB_REP_UNAVAIL; } if (ret != 0) return (ret); - DB_ASSERT(env, start_master || - SITE_FROM_EID(db_rep->self_eid)->membership == SITE_PRESENT); - /* - * If we're the first repmgr_start() call, we will have to start threads. - * Therefore, we require a flags value (to tell us how). + * Catch case where user defines a different local site address than + * the one in the restored_list from an ongoing internal init. */ - if (db_rep->repmgr_status != running && flags == 0) { - __db_errx(env, DB_STR("3639", - "a non-zero flags value is required for initial repmgr_start() call")); + if (!start_master && + SITE_FROM_EID(db_rep->self_eid)->membership != SITE_PRESENT) { + __db_errx(env, DB_STR("3696", + "Current local site conflicts with earlier definition")); return (EINVAL); } @@ -214,37 +282,54 @@ __repmgr_start(dbenv, nthreads, flags) * * Then, in case there could be multiple processes, we're either the * main listener process or a subordinate process. On a "subsequent" - * repmgr_start() call we already have enough information to know which - * it is. Otherwise, negotiate with information in the shared region to - * claim the listener role if possible. + * repmgr_start() call, with a running main listener process, we already + * have enough information to know which it is. Otherwise, if there is + * no listener, negotiate with information in the shared region to claim + * the listener role if possible. Once we decide we're the listener, + * mark the listener id in the shared region, so that no other process + * thinks the same thing. * * To avoid a race, once we decide we're in the first call, mark the * handle as started, so that no other thread thinks the same thing. */ + first = FALSE; + is_listener = FALSE; LOCK_MUTEX(db_rep->mutex); locked = TRUE; - if (db_rep->repmgr_status == running) { - first = FALSE; + if (db_rep->repmgr_status == running && !(rep->listener == 0 && + FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER))) is_listener = !IS_SUBORDINATE(db_rep); - } else { + else if (db_rep->repmgr_status != running && + rep->listener == 0 && flags == 0) + flags_error = 1; + else { first = TRUE; db_rep->repmgr_status = running; - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); if (rep->listener == 0) { is_listener = TRUE; - __os_id(dbenv, &rep->listener, NULL); - } else { - is_listener = FALSE; + __os_id(env->dbenv, &rep->listener, NULL); + } else nthreads = 0; - } MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); } UNLOCK_MUTEX(db_rep->mutex); locked = FALSE; + /* + * The first repmgr_start() call for the main listener process + * requires a flags value to tell us how to start up the site. + * But we don't require a flags value for the repmgr_start() + * call for a subordinate process because the site is already + * started and we would only ignore the value anyway. + */ + if (flags_error) { + __db_errx(env, DB_STR("3639", + "A non-zero flags value is required for initial repmgr_start() call")); + return (EINVAL); + } + if (!first) { /* * Subsequent call is allowed when ELECTIONS are turned off, so @@ -266,7 +351,7 @@ __repmgr_start(dbenv, nthreads, flags) /* * The minimum legal number of threads is either 1 or 0, depending upon - * whether we're the main process or a subordinate. + * whether we're the listener process or a subordinate. */ min = is_listener ? 1 : 0; if (nthreads < min) { @@ -303,14 +388,24 @@ __repmgr_start(dbenv, nthreads, flags) * of rep_start calls even within an env region lifetime. */ if (start_master) { - ret = __repmgr_become_master(env); + ret = __repmgr_become_master(env, 0); /* No other repmgr threads running yet. */ DB_ASSERT(env, ret != DB_REP_UNAVAIL); if (ret != 0) goto err; need_masterseek = FALSE; } else { - if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0) + /* + * The preferred master site cannot allow its gen + * to change until it has done its lsnhist_match to + * guarantee that no preferred master transactions + * will be rolled back. + */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) + startopts = REP_START_HOLD_CLIGEN; + if ((ret = __repmgr_repstart(env, + DB_REP_CLIENT, startopts)) != 0) goto err; /* * The repmgr election code starts elections only if @@ -352,6 +447,7 @@ __repmgr_start(dbenv, nthreads, flags) if ((ret = __repmgr_start_msg_threads(env, (u_int)nthreads)) != 0) goto err; + rep->listener_nthreads = (u_int)nthreads; if (need_masterseek) { /* @@ -374,10 +470,47 @@ __repmgr_start(dbenv, nthreads, flags) } UNLOCK_MUTEX(db_rep->mutex); locked = FALSE; + /* + * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only + * do this for the main listener process. For a subordinate + * process, it is always turned on. + */ + rep->inqueue_full_event_on = 1; + } + if (db_rep->selector == NULL) { + /* All processes (even non-listeners) need a select() thread. */ + if ((ret = __repmgr_start_selector(env)) == 0) { + /* + * A view callback is set but this site isn't yet a + * view in the internal site list. Do the view + * demotion here, which will update the internal + * site list. We need the select() thread for the + * demotion because the demotion performs gmdb + * operations. + */ + if (PARTICIPANT_TO_VIEW(db_rep, + SITE_FROM_EID(db_rep->self_eid)) && + (ret = __repmgr_demote_site(env, + db_rep->self_eid)) != 0) + goto err; + return (is_listener ? 0 : DB_REP_IGNORE); + } + } else { + /* + * If the selector thread already exists, the current process + * should be the new listener which has just finished a + * takeover. Now, all active connections need to be refreshed + * to notify remote sites about the new listener. If a new + * connection is established immediately, disable the existing + * main connection to the same site. Otherwise, schedule a + * second immediate attempt. If it still fails, disable the + * main connection and retry a connection as usual. + */ + DB_ASSERT(env, is_listener && + FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER)); + if ((ret = __repmgr_refresh_selector(env)) == 0) + return (0); } - /* All processes (even non-listeners) need a select() thread. */ - if ((ret = __repmgr_start_selector(env)) == 0) - return (is_listener ? 0 : DB_REP_IGNORE); err: /* If we couldn't succeed at everything, undo the parts we did do. */ @@ -392,6 +525,16 @@ err: if (!locked) LOCK_MUTEX(db_rep->mutex); (void)__repmgr_net_close(env); + /* Reset the listener when we fail before having a valid listen_fd. */ + if (first && is_listener) + rep->listener = 0; + /* + * Reset repmgr_status when we fail before starting a selector if the + * earlier call to __repmgr_stop_threads() hasn't already reset it to + * stopped. + */ + if (db_rep->repmgr_status == running) + db_rep->repmgr_status = ready; UNLOCK_MUTEX(db_rep->mutex); return (ret); } @@ -425,6 +568,53 @@ __repmgr_valid_config(env, flags) } /* + * Set priority, heartbeat and election_retry timeouts for preferred master + * mode. Turn on 2SITE_STRICT and ELECTIONS. Can be called whether or not + * REP_ON() is true + * + * PUBLIC: int __repmgr_prefmas_auto_config __P((DB_ENV *, u_int32_t *)); + */ +int __repmgr_prefmas_auto_config (dbenv, config_flags) + DB_ENV *dbenv; + u_int32_t *config_flags; +{ + ENV * env; + db_timeout_t timeout; + int ret; + + env = dbenv->env; + timeout = 0; + + /* Change heartbeat timeouts if they are not already set. */ + if ((ret = __rep_get_timeout(dbenv, + DB_REP_HEARTBEAT_MONITOR, &timeout)) == 0 && + timeout == 0 && (ret = __rep_set_timeout_int(env, + DB_REP_HEARTBEAT_MONITOR, + DB_REPMGR_PREFMAS_HEARTBEAT_MONITOR)) != 0) + return (ret); + if ((ret = __rep_get_timeout(dbenv, + DB_REP_HEARTBEAT_SEND, &timeout)) == 0 && + timeout == 0 && (ret = __rep_set_timeout_int(env, + DB_REP_HEARTBEAT_SEND, DB_REPMGR_PREFMAS_HEARTBEAT_SEND)) != 0) + return (ret); + + /* Change election_retry timeout if it is still the default value. */ + if ((ret = __rep_get_timeout(dbenv, + DB_REP_ELECTION_RETRY, &timeout)) == 0 && + timeout == DB_REPMGR_DEFAULT_ELECTION_RETRY && + (ret = __rep_set_timeout_int(env, + DB_REP_ELECTION_RETRY, DB_REPMGR_PREFMAS_ELECTION_RETRY)) != 0) + return (ret); + + if ((ret = __rep_set_priority_int(env, FLD_ISSET(*config_flags, + REP_C_PREFMAS_MASTER) ? DB_REPMGR_PREFMAS_PRIORITY_MASTER : + DB_REPMGR_PREFMAS_PRIORITY_CLIENT)) != 0) + return (ret); + FLD_SET(*config_flags, REP_C_ELECTIONS | REP_C_2SITE_STRICT); + return (0); +} + +/* * Starts message processing threads. On entry, the actual number of threads * already active is db_rep->nthreads; the desired number of threads is passed * as "n". @@ -473,7 +663,7 @@ __repmgr_restart(env, nthreads, flags) REP *rep; REPMGR_RUNNABLE **th; u_int32_t cur_repflags; - int locked, ret, t_ret; + int locked, ret, role_change, t_ret; u_int delta, i, min, nth; th = NULL; @@ -491,6 +681,7 @@ __repmgr_restart(env, nthreads, flags) } ret = 0; + role_change = 0; db_rep = env->rep_handle; DB_ASSERT(env, REP_ON(env)); rep = db_rep->region; @@ -498,11 +689,14 @@ __repmgr_restart(env, nthreads, flags) cur_repflags = F_ISSET(rep, REP_F_MASTER | REP_F_CLIENT); DB_ASSERT(env, cur_repflags); if (FLD_ISSET(cur_repflags, REP_F_MASTER) && - flags == DB_REP_CLIENT) + flags == DB_REP_CLIENT) { ret = __repmgr_become_client(env); - else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) && - flags == DB_REP_MASTER) - ret = __repmgr_become_master(env); + role_change = 1; + } else if (FLD_ISSET(cur_repflags, REP_F_CLIENT) && + flags == DB_REP_MASTER) { + ret = __repmgr_become_master(env, 0); + role_change = 1; + } if (ret != 0) return (ret); @@ -574,6 +768,9 @@ __repmgr_restart(env, nthreads, flags) } __os_free(env, th); } + /* We will always turn on the inqueue full event after role change. */ + if (role_change) + rep->inqueue_full_event_on = 1; out: if (locked) UNLOCK_MUTEX(db_rep->mutex); @@ -668,7 +865,8 @@ __repmgr_start_selector(env) * PUBLIC: int __repmgr_close __P((ENV *)); * * Close repmgr during env close. It stops repmgr, frees sites array and - * its addresses. + * its addresses. Note that it is possible for the sites array to exist + * and require deallocation independently of whether repmgr was started. */ int __repmgr_close(env) @@ -679,10 +877,15 @@ __repmgr_close(env) int ret; u_int i; - db_rep = env->rep_handle; + if ((db_rep = env->rep_handle) == NULL) + return (0); ret = 0; - ret = __repmgr_stop(env); + /* Stop repmgr and all of its threads if it was previously started. */ + if (IS_ENV_REPLICATED(env)) + ret = __repmgr_stop(env); + + /* Clean up sites array regardless of whether we could stop repmgr. */ if (db_rep->sites != NULL) { for (i = 0; i < db_rep->site_cnt; i++) { site = &db_rep->sites[i]; @@ -756,9 +959,9 @@ __repmgr_set_ack_policy(dbenv, policy) DB_ENV *dbenv; int policy; { + ENV *env; DB_REP *db_rep; DB_THREAD_INFO *ip; - ENV *env; REP *rep; int ret; @@ -823,6 +1026,208 @@ __repmgr_get_ack_policy(dbenv, policy) } /* + * PUBLIC: int __repmgr_set_incoming_queue_max __P((DB_ENV *, u_int32_t, + * PUBLIC: u_int32_t)); + * + * Sets the maximum amount of dynamic memory used by the Replication Manager + * incoming queue. + */ +int +__repmgr_set_incoming_queue_max(dbenv, gbytes, bytes) + DB_ENV *dbenv; + u_int32_t gbytes, bytes; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + ENV *env; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + ENV_NOT_CONFIGURED( + env, db_rep->region, "DB_ENV->repmgr_set_incoming_queue_max", + DB_INIT_REP); + + if (APP_IS_BASEAPI(env)) { + __db_errx(env, "%s %s", + "DB_ENV->repmgr_set_incoming_queue_max:", + "cannot call from base replication application"); + return (EINVAL); + } + + /* + * If the caller provided 0 for the size, the size will be unlimited. + */ + if (gbytes == 0 && bytes == 0) { + gbytes = UINT32_MAX; + bytes = GIGABYTE - 1; + } + + while (bytes >= GIGABYTE) { + bytes -= GIGABYTE; + if (gbytes < UINT32_MAX) + gbytes++; + } + + if (REP_ON(env)) { + ENV_ENTER(env, ip); + MUTEX_LOCK(env, rep->mtx_repmgr); + rep->inqueue_max_gbytes = gbytes; + rep->inqueue_max_bytes = bytes; + __repmgr_set_incoming_queue_redzone(rep, gbytes, bytes); + MUTEX_UNLOCK(env, rep->mtx_repmgr); + ENV_LEAVE(env, ip); + } else { + db_rep->inqueue_max_gbytes = gbytes; + db_rep->inqueue_max_bytes = bytes; + } + + /* + * Setting incoming queue maximum sizes makes this a replication + * manager application. + */ + APP_SET_REPMGR(env); + return (0); +} + +/* + * PUBLIC: int __repmgr_get_incoming_queue_max __P((DB_ENV *, u_int32_t *, + * PUBLIC: u_int32_t *)); + * + * Gets the maximum amount of dynamic memory that can be used by the + * Replicaton Manager incoming queue. + */ +int +__repmgr_get_incoming_queue_max(dbenv, gbytesp, bytesp) + DB_ENV *dbenv; + u_int32_t *gbytesp, *bytesp; +{ + ENV *env; + DB_THREAD_INFO *ip; + DB_REP *db_rep; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + if (REP_ON(env)) { + ENV_ENTER(env, ip); + MUTEX_LOCK(env, rep->mtx_repmgr); + *gbytesp = rep->inqueue_max_gbytes; + *bytesp = rep->inqueue_max_bytes; + MUTEX_UNLOCK(env, rep->mtx_repmgr); + ENV_LEAVE(env, ip); + } else { + *gbytesp = db_rep->inqueue_max_gbytes; + *bytesp = db_rep->inqueue_max_bytes; + } + + return (0); +} + +/* + * PUBLIC: void __repmgr_set_incoming_queue_redzone __P((void *, u_int32_t, + * PUBLIC: u_int32_t)); + * + * Sets the lower bound of the repmgr incoming queue red zone. + * !!! Assumes caller holds mtx_repmgr lock. + * + * Note that we can't simply get the REP* address from the env as we usually do, + * because at the time of this call it may not have been linked into there yet. + * Also note that, REP is not a public structure, so we use "void *" here. + */ +void __repmgr_set_incoming_queue_redzone(rep_, gbytes, bytes) + void *rep_; + u_int32_t gbytes, bytes; +{ + REP *rep; + double rdgbytes, rdbytes; + + rep = rep_; + + /* + * We use 'double' values to do the computation for precision, and + * to avoid overflow. + */ + rdgbytes = gbytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00; + rdbytes = (rdgbytes - (u_int32_t)rdgbytes) * GIGABYTE; + rdbytes += bytes * 1.00 * DB_REPMGR_INQUEUE_REDZONE_PERCENT / 100.00; + if (rdbytes >= GIGABYTE) { + rdgbytes += 1; + rdbytes -= GIGABYTE; + } + rep->inqueue_rz_gbytes = (u_int32_t)rdgbytes; + rep->inqueue_rz_bytes = (u_int32_t)rdbytes; +} + +/* + * PUBLIC: int __repmgr_get_incoming_queue_redzone __P((DB_ENV *, + * PUBLIC: u_int32_t *, u_int32_t *)); + * + * Gets the lower bound of the repmgr incoming queue red zone. + * This method must be called after environment open. + */ +int __repmgr_get_incoming_queue_redzone(dbenv, gbytesp, bytesp) + DB_ENV *dbenv; + u_int32_t *gbytesp, *bytesp; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + ENV *env; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + ENV_REQUIRES_CONFIG( + env, db_rep->region, "__repmgr_get_incoming_queue_redzone", + DB_INIT_REP); + + ENV_ENTER(env, ip); + MUTEX_LOCK(env, rep->mtx_repmgr); + *gbytesp = rep->inqueue_rz_gbytes; + *bytesp = rep->inqueue_rz_bytes; + MUTEX_UNLOCK(env, rep->mtx_repmgr); + ENV_LEAVE(env, ip); + + return (0); +} + +/* + * PUBLIC: int __repmgr_get_incoming_queue_fullevent __P((DB_ENV *, + * PUBLIC: int *)); + * + * Return whether the DB_EVENT_REP_INQUEUE_FULL event firing is + * turned on or off. + * This method must be called after environment open. + */ +int __repmgr_get_incoming_queue_fullevent(dbenv, onoffp) + DB_ENV *dbenv; + int *onoffp; +{ + DB_REP *db_rep; + ENV *env; + REP *rep; + + env = dbenv->env; + db_rep = env->rep_handle; + rep = db_rep->region; + + ENV_REQUIRES_CONFIG( + env, db_rep->region, + "DB_ENV->__repmgr_get_incoming_queue_fullevent", + DB_INIT_REP); + + *onoffp = rep->inqueue_full_event_on ? 1 : 0; + + return (0); +} + +/* * PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *)); */ int @@ -837,7 +1242,13 @@ __repmgr_env_create(env, db_rep) db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; db_rep->config_nsites = 0; + ADJUST_AUTOTAKEOVER_WAITS(db_rep, DB_REPMGR_DEFAULT_ACK_TIMEOUT); db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; + db_rep->inqueue_max_gbytes = 0; + db_rep->inqueue_max_bytes = 0; +#ifdef HAVE_REPLICATION_LISTENER_TAKEOVER + FLD_SET(db_rep->config, REP_C_AUTOTAKEOVER); +#endif FLD_SET(db_rep->config, REP_C_ELECTIONS); FLD_SET(db_rep->config, REP_C_2SITE_STRICT); @@ -846,7 +1257,8 @@ __repmgr_env_create(env, db_rep) TAILQ_INIT(&db_rep->connections); TAILQ_INIT(&db_rep->retries); - db_rep->input_queue.size = 0; + db_rep->input_queue.gbytes = 0; + db_rep->input_queue.bytes = 0; STAILQ_INIT(&db_rep->input_queue.header); __repmgr_env_create_pf(db_rep); @@ -944,6 +1356,15 @@ __repmgr_await_threads(env) * of a connector thread. */ + /* Takeover thread. */ + if (db_rep->takeover_thread != NULL) { + if ((t_ret = __repmgr_thread_join(db_rep->takeover_thread)) != + 0 && ret == 0) + ret = t_ret; + __os_free(env, db_rep->takeover_thread); + db_rep->takeover_thread = NULL; + } + /* Message processing threads. */ for (i = 0; i < db_rep->nthreads && db_rep->messengers[i] != NULL; i++) { @@ -1178,7 +1599,7 @@ get_shared_netaddr(env, eid, netaddr) MUTEX_LOCK(env, rep->mtx_repmgr); if ((u_int)eid >= rep->site_cnt) { - ret = DB_NOTFOUND; + ret = USR_ERR(env, DB_NOTFOUND); goto err; } DB_ASSERT(env, rep->siteinfo_off != INVALID_ROFF); @@ -1423,7 +1844,7 @@ send_msg_self(env, iovecs, nmsg) u_int32_t nmsg; { REPMGR_MESSAGE *msg; - size_t align, bodysize, structsize; + size_t align, bodysize, msgsize, structsize; u_int8_t *membase; int ret; @@ -1431,10 +1852,12 @@ send_msg_self(env, iovecs, nmsg) bodysize = iovecs->total_bytes - __REPMGR_MSG_HDR_SIZE; structsize = (size_t)DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) + nmsg * sizeof(DBT)), align); - if ((ret = __os_malloc(env, structsize + bodysize, &membase)) != 0) + msgsize = structsize + bodysize; + if ((ret = __os_malloc(env, msgsize, &membase)) != 0) return (ret); msg = (void*)membase; + msg->size = msgsize; membase += structsize; /* @@ -1616,13 +2039,14 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) } ENV_ENTER(env, ip); - ret = get_channel_connection(channel, &conn); - ENV_LEAVE(env, ip); - if (ret != 0) - return (ret); + if ((ret = get_channel_connection(channel, &conn)) != 0) + goto out; - if (conn == NULL) - return (request_self(env, request, nrequest, response, flags)); + /* If conn is NULL, call request_self and then we are done here. */ + if (conn == NULL) { + ret = request_self(env, request, nrequest, response, flags); + goto out; + } /* Find an available array slot, or grow the array if necessary. */ LOCK_MUTEX(db_rep->mutex); @@ -1670,7 +2094,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) LOCK_MUTEX(db_rep->mutex); F_CLR(&conn->responses[i], RESP_IN_USE | RESP_THREAD_WAITING); UNLOCK_MUTEX(db_rep->mutex); - return (ret); + goto out; } timeout = timeout > 0 ? timeout : db_channel->timeout; @@ -1688,7 +2112,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) * to wake up those threads, with a COMPLETE indication and an * error code. That's more than we want to tackle here. */ - return (ret); + goto out; } /* @@ -1732,7 +2156,7 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) sz = conn->iovecs.vectors[0].iov_len; if ((ret = __os_malloc(env, sz, &dummy)) != 0) - goto out; + goto out_unlck; __repmgr_iovec_init(&conn->iovecs); DB_INIT_DBT(resp->dbt, dummy, sz); __repmgr_add_dbt(&conn->iovecs, &resp->dbt); @@ -1740,8 +2164,9 @@ __repmgr_send_request(db_channel, request, nrequest, response, timeout, flags) } } -out: +out_unlck: UNLOCK_MUTEX(db_rep->mutex); +out: ENV_LEAVE(env, ip); return (ret); } @@ -2168,6 +2593,7 @@ __repmgr_channel_close(dbchan, flags) { ENV *env; DB_REP *db_rep; + DB_THREAD_INFO *ip; REPMGR_CONNECTION *conn; CHANNEL *channel; u_int32_t i; @@ -2182,6 +2608,7 @@ __repmgr_channel_close(dbchan, flags) * Disable connection(s) (if not already done due to an error having * occurred previously); release our reference to conn struct(s). */ + ENV_ENTER(env, ip); LOCK_MUTEX(db_rep->mutex); if (dbchan->eid >= 0) { conn = channel->c.conn; @@ -2218,6 +2645,7 @@ __repmgr_channel_close(dbchan, flags) __os_free(env, channel); __os_free(env, dbchan); + ENV_LEAVE(env, ip); return (ret); } @@ -2369,29 +2797,26 @@ join_group_at_site(env, addrp) repmgr_netaddr_t *addrp; { DB_REP *db_rep; + REP *rep; REPMGR_CONNECTION *conn; SITE_STRING_BUFFER addr_buf; repmgr_netaddr_t addr, myaddr; __repmgr_gm_fwd_args fwd; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; u_int8_t *p, *response_buf, siteinfo_buf[MAX_MSG_BUF]; char host_buf[MAXHOSTNAMELEN + 1], *host; u_int32_t gen, type; - size_t len; + size_t host_len, msg_len, req_len; int ret, t_ret; db_rep = env->rep_handle; + rep = db_rep->region; LOCK_MUTEX(db_rep->mutex); myaddr = SITE_FROM_EID(db_rep->self_eid)->net_addr; UNLOCK_MUTEX(db_rep->mutex); - len = strlen(myaddr.host) + 1; - DB_INIT_DBT(site_info.host, myaddr.host, len); - site_info.port = myaddr.port; - site_info.flags = 0; - ret = __repmgr_site_info_marshal(env, - &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); - DB_ASSERT(env, ret == 0); + host_len = strlen(myaddr.host) + 1; conn = NULL; response_buf = NULL; @@ -2399,14 +2824,35 @@ join_group_at_site(env, addrp) RPRINT(env, (env, DB_VERB_REPMGR_MISC, "try join request to site %s", __repmgr_format_addr_loc(addrp, addr_buf))); retry: - if ((ret = make_request_conn(env, addrp, &conn)) != 0) + if ((ret = __repmgr_make_request_conn(env, addrp, &conn)) != 0) return (ret); + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); + if (conn->version < 5) { + DB_INIT_DBT(v4site_info.host, myaddr.host, host_len); + v4site_info.port = myaddr.port; + v4site_info.flags = 0; + ret = __repmgr_v4site_info_marshal(env, + &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len); + } else { + DB_INIT_DBT(site_info.host, myaddr.host, host_len); + site_info.port = myaddr.port; + site_info.status = 0; + site_info.flags = 0; + if (IS_VIEW_SITE(env)) + FLD_SET(site_info.flags, SITE_VIEW); + if (rep->priority > 0) + FLD_SET(site_info.flags, SITE_JOIN_ELECTABLE); + ret = __repmgr_site_info_marshal(env, + &site_info, siteinfo_buf, sizeof(siteinfo_buf), &req_len); + } + DB_ASSERT(env, ret == 0); + /* Preserve separate request length in case there is a retry. */ if ((ret = __repmgr_send_sync_msg(env, conn, - REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0) + REPMGR_JOIN_REQUEST, siteinfo_buf, (u_int32_t)req_len)) != 0) goto err; - if ((ret = read_own_msg(env, - conn, &type, &response_buf, &len)) != 0) + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &msg_len)) != 0) goto err; if (type == REPMGR_GM_FAILURE) { @@ -2429,7 +2875,7 @@ retry: goto err; ret = __repmgr_gm_fwd_unmarshal(env, &fwd, - response_buf, len, &p); + response_buf, msg_len, &p); DB_ASSERT(env, ret == 0); if (fwd.gen > gen) { if (fwd.host.size > MAXHOSTNAMELEN + 1) { @@ -2456,7 +2902,8 @@ retry: } } if (type == REPMGR_JOIN_SUCCESS) - ret = __repmgr_refresh_membership(env, response_buf, len); + ret = __repmgr_refresh_membership(env, response_buf, msg_len, + conn->version); else ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */ @@ -2476,129 +2923,6 @@ err: } /* - * Reads a whole message, when we expect to get a REPMGR_OWN_MSG. - */ -static int -read_own_msg(env, conn, typep, bufp, lenp) - ENV *env; - REPMGR_CONNECTION *conn; - u_int32_t *typep; - u_int8_t **bufp; - size_t *lenp; -{ - __repmgr_msg_hdr_args msg_hdr; - u_int8_t *buf; - u_int32_t type; - size_t size; - int ret; - - __repmgr_reset_for_reading(conn); - if ((ret = __repmgr_read_conn(conn)) != 0) - goto err; - ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, - conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); - DB_ASSERT(env, ret == 0); - - if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) { - ret = DB_REP_UNAVAIL; /* Protocol violation. */ - goto err; - } - type = REPMGR_OWN_MSG_TYPE(msg_hdr); - if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) { - conn->reading_phase = DATA_PHASE; - __repmgr_iovec_init(&conn->iovecs); - - if ((ret = __os_malloc(env, size, &buf)) != 0) - goto err; - conn->input.rep_message = NULL; - - __repmgr_add_buffer(&conn->iovecs, buf, size); - if ((ret = __repmgr_read_conn(conn)) != 0) { - __os_free(env, buf); - goto err; - } - *bufp = buf; - } - - *typep = type; - *lenp = size; - -err: - return (ret); -} - -static int -make_request_conn(env, addr, connp) - ENV *env; - repmgr_netaddr_t *addr; - REPMGR_CONNECTION **connp; -{ - DBT vi; - __repmgr_msg_hdr_args msg_hdr; - __repmgr_version_confirmation_args conf; - REPMGR_CONNECTION *conn; - int alloc, ret, unused; - - alloc = FALSE; - if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0) - return (ret); - conn->type = APP_CONNECTION; - - /* Read a handshake msg, to get version confirmation and parameters. */ - if ((ret = __repmgr_read_conn(conn)) != 0) - goto err; - /* - * We can only get here after having read the full 9 bytes that we - * expect, so this can't fail. - */ - DB_ASSERT(env, conn->reading_phase == SIZES_PHASE); - ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, - conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); - DB_ASSERT(env, ret == 0); - __repmgr_iovec_init(&conn->iovecs); - conn->reading_phase = DATA_PHASE; - - if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0) - goto err; - alloc = TRUE; - - if ((ret = __repmgr_read_conn(conn)) != 0) - goto err; - - /* - * Analyze the handshake msg, and stash relevant info. - */ - if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0) - goto err; - DB_ASSERT(env, vi.size > 0); - if ((ret = __repmgr_version_confirmation_unmarshal(env, - &conf, vi.data, vi.size, NULL)) != 0) - goto err; - - if (conf.version < GM_MIN_VERSION) { - ret = DB_REP_UNAVAIL; - goto err; - } - conn->version = conf.version; - -err: - if (alloc) { - DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0); - __os_free(env, conn->input.repmgr_msg.cntrl.data); - DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0); - __os_free(env, conn->input.repmgr_msg.rec.data); - } - __repmgr_reset_for_reading(conn); - if (ret == 0) - *connp = conn; - else { - (void)__repmgr_close_connection(env, conn); - (void)__repmgr_destroy_conn(env, conn); - } - return (ret); -} - -/* * PUBLIC: int __repmgr_site __P((DB_ENV *, * PUBLIC: const char *, u_int, DB_SITE **, u_int32_t)); */ @@ -2640,9 +2964,9 @@ site_by_addr(env, host, port, sitep) if ((ret = addr_chk(env, host, port)) != 0) return (ret); + ENV_ENTER(env, ip); if (REP_ON(env)) { LOCK_MUTEX(db_rep->mutex); - ENV_ENTER(env, ip); locked = TRUE; } else locked = FALSE; @@ -2654,10 +2978,9 @@ site_by_addr(env, host, port, sitep) * we want the DB_SITE handle to point to; just like site_by_eid() does. */ host = site->net_addr.host; - if (locked) { - ENV_LEAVE(env, ip); + if (locked) UNLOCK_MUTEX(db_rep->mutex); - } + ENV_LEAVE(env, ip); if (ret != 0) return (ret); @@ -2723,7 +3046,7 @@ init_dbsite(env, eid, host, port, sitep) dbsite->get_address = __repmgr_get_site_address; dbsite->get_config = __repmgr_get_config; dbsite->get_eid = __repmgr_get_eid; - dbsite->set_config = __repmgr_site_config; + dbsite->set_config = __repmgr_site_config_pp; dbsite->remove = __repmgr_remove_site_pp; dbsite->close = __repmgr_site_close; @@ -2756,9 +3079,16 @@ __repmgr_get_eid(dbsite, eidp) DB_SITE *dbsite; int *eidp; { + DB_THREAD_INFO *ip; + ENV *env; int ret; - if ((ret = refresh_site(dbsite)) != 0) + env = dbsite->env; + + ENV_ENTER(env, ip); + ret = refresh_site(dbsite); + ENV_LEAVE(env, ip); + if (ret != 0) return (ret); if (F_ISSET(dbsite, DB_SITE_PREOPEN)) { @@ -2791,8 +3121,11 @@ __repmgr_get_config(dbsite, which, valuep) env = dbsite->env; db_rep = env->rep_handle; - if ((ret = refresh_site(dbsite)) != 0) + ENV_ENTER(env, ip); + if ((ret = refresh_site(dbsite)) != 0) { + ENV_LEAVE(env, ip); return (ret); + } LOCK_MUTEX(db_rep->mutex); DB_ASSERT(env, IS_VALID_EID(dbsite->eid)); site = SITE_FROM_EID(dbsite->eid); @@ -2800,32 +3133,52 @@ __repmgr_get_config(dbsite, which, valuep) rep = db_rep->region; infop = env->reginfo; - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); sites = R_ADDR(infop, rep->siteinfo_off); site->config = sites[dbsite->eid].config; MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); } *valuep = FLD_ISSET(site->config, which) ? 1 : 0; UNLOCK_MUTEX(db_rep->mutex); + ENV_LEAVE(env, ip); return (0); } /* - * PUBLIC: int __repmgr_site_config __P((DB_SITE *, u_int32_t, u_int32_t)); + * PUBLIC: int __repmgr_site_config_pp __P((DB_SITE *, u_int32_t, u_int32_t)); */ int -__repmgr_site_config(dbsite, which, value) +__repmgr_site_config_pp(dbsite, which, value) DB_SITE *dbsite; u_int32_t which; u_int32_t value; { - DB_REP *db_rep; DB_THREAD_INFO *ip; ENV *env; + int ret; + + env = dbsite->env; + + ENV_ENTER(env, ip); + ret = __repmgr_site_config_int(dbsite, which, value); + ENV_LEAVE(env, ip); + + return (ret); +} + +/* + * PUBLIC: int __repmgr_site_config_int __P((DB_SITE *, u_int32_t, u_int32_t)); + */ +int +__repmgr_site_config_int(dbsite, which, value) + DB_SITE *dbsite; + u_int32_t which; + u_int32_t value; +{ + DB_REP *db_rep; + ENV *env; REGINFO *infop; REP *rep; REPMGR_SITE *site; @@ -2875,7 +3228,6 @@ __repmgr_site_config(dbsite, which, value) infop = env->reginfo; LOCK_MUTEX(db_rep->mutex); - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); sites = R_ADDR(infop, rep->siteinfo_off); site = SITE_FROM_EID(dbsite->eid); @@ -2896,7 +3248,6 @@ __repmgr_site_config(dbsite, which, value) rep->siteinfo_seq++; } MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); UNLOCK_MUTEX(db_rep->mutex); } else { site = SITE_FROM_EID(dbsite->eid); @@ -2930,7 +3281,6 @@ set_local_site(dbsite, value) if (REP_ON(env)) { rep = db_rep->region; LOCK_MUTEX(db_rep->mutex); - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_repmgr); locked = TRUE; /* Make sure we're in sync first. */ @@ -2941,31 +3291,32 @@ set_local_site(dbsite, value) __db_errx(env, DB_STR("3666", "A previously given local site may not be unset")); ret = EINVAL; - } else if (IS_VALID_EID(db_rep->self_eid) && - db_rep->self_eid != dbsite->eid) { - __db_errx(env, DB_STR("3667", - "A (different) local site has already been set")); - ret = EINVAL; - } else { - DB_ASSERT(env, IS_VALID_EID(dbsite->eid)); - site = SITE_FROM_EID(dbsite->eid); - if (FLD_ISSET(site->config, - DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) { - __db_errx(env, DB_STR("3668", - "Local site cannot have HELPER or PEER attributes")); + } else if (value) { + if (IS_VALID_EID(db_rep->self_eid) && + db_rep->self_eid != dbsite->eid) { + __db_errx(env, DB_STR("3697", + "A (different) local site has already been set")); ret = EINVAL; + } else { + DB_ASSERT(env, IS_VALID_EID(dbsite->eid)); + site = SITE_FROM_EID(dbsite->eid); + if (FLD_ISSET(site->config, + DB_BOOTSTRAP_HELPER | DB_REPMGR_PEER)) { + __db_errx(env, DB_STR("3698", + "Local site cannot have HELPER or PEER attributes")); + ret = EINVAL; + } } } - if (ret == 0) { + if (ret == 0 && value) { db_rep->self_eid = dbsite->eid; if (locked) { - rep->self_eid = dbsite->eid; + rep->self_eid = db_rep->self_eid; rep->siteinfo_seq++; } } if (locked) { MUTEX_UNLOCK(env, rep->mtx_repmgr); - ENV_LEAVE(env, ip); UNLOCK_MUTEX(db_rep->mutex); } return (ret); @@ -2998,7 +3349,7 @@ refresh_site(dbsite) } static int -__repmgr_remove_site_pp(dbsite) +__repmgr_remove_and_close_site(dbsite) DB_SITE *dbsite; { int ret, t_ret; @@ -3011,6 +3362,23 @@ __repmgr_remove_site_pp(dbsite) */ if ((t_ret = __repmgr_site_close(dbsite)) != 0 && ret == 0) ret = t_ret; + + return (ret); +} + +static int +__repmgr_remove_site_pp(dbsite) + DB_SITE *dbsite; +{ + ENV *env; + DB_THREAD_INFO *ip; + int ret; + + env = dbsite->env; + + ENV_ENTER(env, ip); + ret = __repmgr_remove_and_close_site(dbsite); + ENV_LEAVE(env, ip); return (ret); } @@ -3024,6 +3392,7 @@ __repmgr_remove_site(dbsite) REPMGR_CONNECTION *conn; repmgr_netaddr_t addr; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; u_int8_t *response_buf, siteinfo_buf[MAX_MSG_BUF]; size_t len; u_int32_t type; @@ -3046,23 +3415,33 @@ __repmgr_remove_site(dbsite) DB_ASSERT(env, IS_VALID_EID(master)); addr = SITE_FROM_EID(master)->net_addr; UNLOCK_MUTEX(db_rep->mutex); - len = strlen(dbsite->host) + 1; - DB_INIT_DBT(site_info.host, dbsite->host, len); - site_info.port = dbsite->port; - site_info.flags = 0; - ret = __repmgr_site_info_marshal(env, - &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); - DB_ASSERT(env, ret == 0); conn = NULL; response_buf = NULL; - if ((ret = make_request_conn(env, &addr, &conn)) != 0) + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) return (ret); + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); + if (conn->version < 5) { + DB_INIT_DBT(v4site_info.host, dbsite->host, len); + v4site_info.port = dbsite->port; + v4site_info.flags = 0; + ret = __repmgr_v4site_info_marshal(env, + &v4site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); + } else { + DB_INIT_DBT(site_info.host, dbsite->host, len); + site_info.port = dbsite->port; + site_info.status = 0; + site_info.flags = 0; + ret = __repmgr_site_info_marshal(env, + &site_info, siteinfo_buf, sizeof(siteinfo_buf), &len); + } + DB_ASSERT(env, ret == 0); + if ((ret = __repmgr_send_sync_msg(env, conn, REPMGR_REMOVE_REQUEST, siteinfo_buf, (u_int32_t)len)) != 0) goto err; - if ((ret = read_own_msg(env, + if ((ret = __repmgr_read_own_msg(env, conn, &type, &response_buf, &len)) != 0) goto err; ret = type == REPMGR_REMOVE_SUCCESS ? 0 : DB_REP_UNAVAIL; @@ -3090,3 +3469,82 @@ __repmgr_site_close(dbsite) __os_free(dbsite->env, dbsite); return (0); } + +/* + * Demotes a participant site to a view. This is a one-way and one-time + * operation. + * + * The demotion occurs at the very end of repmgr_start() because it + * requires a select thread to perform the gmdb operations that remove + * the site from the replication group and immediately add the site back + * into the group as a view. The demotion also preserves any other threads + * created by repmgr_start() so that they are there to be used by the + * demoted site after it is re-added as a view site. + * + * We remove and re-add the site to propagate the site's change from + * participant to view to all sites in the replication group. This includes + * updates to each site's gmdb and in-memory site list. + */ +#define REPMGR_DEMOTION_MASTER_RETRIES 10 +#define REPMGR_DEMOTION_RETRY_USECS 500000 +static int +__repmgr_demote_site(env, eid) + ENV *env; + int eid; +{ + DB_REP *db_rep; + DB_SITE *dbsite; + REP *rep; + REPMGR_SITE *site; + int ret, t_ret, tries; + + db_rep = env->rep_handle; + rep = db_rep->region; + site = SITE_FROM_EID(eid); + dbsite = NULL; + + /* Inform other repmgr threads that a demotion is in progress. */ + db_rep->demotion_pending = TRUE; + + if ((ret = init_dbsite(env, eid, site->net_addr.host, + site->net_addr.port, &dbsite)) != 0) + goto err; + + /* + * We need a master to perform the gmdb updates. Poll periodically + * for a limited time to find one. + */ + tries = 0; + while (rep->master_id == DB_EID_INVALID) { + __os_yield(env, 0, REPMGR_DEMOTION_RETRY_USECS); + if (++tries >= REPMGR_DEMOTION_MASTER_RETRIES) { + ret = DB_REP_UNAVAIL; + goto err; + } + } + + /* Remove site from replication group. */ + if ((ret = __repmgr_remove_site(dbsite)) != 0) + goto err; + + /* + * Add site back into replication group as a view. This demotion is + * occurring because this site now has a view callback but its + * SITE_VIEW flag is not set. Now, __repmgr_join_group() will detect + * the view callback and set the SITE_VIEW flag before sending this + * site's information to the rest of the replication group. + */ + if ((ret = __repmgr_join_group(env)) != 0) + goto err; + +err: + /* Deallocates dbsite. */ + if (dbsite != NULL) { + t_ret = __repmgr_site_close(dbsite); + if (ret == 0 && t_ret != 0) + ret = t_ret; + } + /* Must reset demotion_pending before leaving this routine. */ + db_rep->demotion_pending = FALSE; + return (ret); +} diff --git a/src/repmgr/repmgr_msg.c b/src/repmgr/repmgr_msg.c index 13537823..71cb2ada 100644 --- a/src/repmgr/repmgr_msg.c +++ b/src/repmgr/repmgr_msg.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -15,15 +15,19 @@ #include "dbinc_auto/repmgr_auto.h" static int dispatch_app_message __P((ENV *, REPMGR_MESSAGE *)); -static int finish_gmdb_update __P((ENV *, - DB_THREAD_INFO *, DBT *, u_int32_t, u_int32_t, __repmgr_member_args *)); +static int finish_gmdb_update __P((ENV *, DB_THREAD_INFO *, + DBT *, u_int32_t, u_int32_t, u_int32_t, __repmgr_member_args *)); static int incr_gm_version __P((ENV *, DB_THREAD_INFO *, DB_TXN *)); -static void marshal_site_data __P((ENV *, u_int32_t, u_int8_t *, DBT *)); +static void marshal_site_data __P((ENV *, + u_int32_t, u_int32_t, u_int8_t *, DBT *)); static void marshal_site_key __P((ENV *, repmgr_netaddr_t *, u_int8_t *, DBT *, __repmgr_member_args *)); static int message_loop __P((ENV *, REPMGR_RUNNABLE *)); +static int preferred_master_takeover __P((ENV*)); static int process_message __P((ENV*, DBT*, DBT*, int)); static int reject_fwd __P((ENV *, REPMGR_CONNECTION *)); +static int rejoin_connections(ENV *); +static int rejoin_deferred_election(ENV *); static int rescind_pending __P((ENV *, DB_THREAD_INFO *, int, u_int32_t, u_int32_t)); static int resolve_limbo_int __P((ENV *, DB_THREAD_INFO *)); @@ -33,9 +37,13 @@ static int send_permlsn_conn __P((ENV *, REPMGR_CONNECTION *, u_int32_t, DB_LSN *)); static int serve_join_request __P((ENV *, DB_THREAD_INFO *, REPMGR_MESSAGE *)); +static int serve_lsnhist_request __P((ENV *, DB_THREAD_INFO *, + REPMGR_MESSAGE *)); +static int serve_readonly_master_request __P((ENV *, REPMGR_MESSAGE *)); static int serve_remove_request __P((ENV *, DB_THREAD_INFO *, REPMGR_MESSAGE *)); static int serve_repmgr_request __P((ENV *, REPMGR_MESSAGE *)); +static int serve_restart_client_request __P((ENV *, REPMGR_MESSAGE *)); /* * Map one of the phase-1/provisional membership status values to its @@ -72,6 +80,7 @@ message_loop(env, th) REPMGR_RUNNABLE *th; { DB_REP *db_rep; + DB_THREAD_INFO *ip; REP *rep; REPMGR_MESSAGE *msg; REPMGR_CONNECTION *conn; @@ -83,6 +92,7 @@ message_loop(env, th) COMPQUIET(membership, 0); db_rep = env->rep_handle; rep = db_rep->region; + ENV_ENTER(env, ip); LOCK_MUTEX(db_rep->mutex); while ((ret = __repmgr_queue_get(env, &msg, th)) == 0) { incremented = FALSE; @@ -141,7 +151,21 @@ message_loop(env, th) * detect it without the need for application * activity. */ - ret = __rep_flush(env->dbenv); + ret = __rep_flush_int(env); + } else if (db_rep->prefmas_pending == master_switch && + IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER) && + F_ISSET(rep, REP_F_CLIENT)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, +"message_loop heartbeat preferred master switch")); + /* + * We are a preferred master site currently + * running as a client and we have finished + * syncing with the temporary master. It is + * now time to take over as master. + */ + db_rep->prefmas_pending = no_action; + ret = preferred_master_takeover(env); } else { /* * Use heartbeat message to initiate rerequest @@ -162,6 +186,12 @@ message_loop(env, th) db_rep->non_rep_th--; if (ret != 0) goto out; + if (db_rep->view_mismatch) { + __db_errx(env, DB_STR("3699", + "Site is not recorded as a view in the group membership database")); + ret = EINVAL; + goto out; + } } /* * A return of DB_REP_UNAVAIL from __repmgr_queue_get() merely means we @@ -171,6 +201,7 @@ message_loop(env, th) ret = 0; out: UNLOCK_MUTEX(db_rep->mutex); + ENV_LEAVE(env, ip); return (ret); } @@ -341,16 +372,45 @@ process_message(env, control, rec, eid) break; case DB_REP_DUPMASTER: - /* - * Initiate an election if we're configured to be using - * elections, but only if we're *NOT* using leases. When using - * leases, there is never any uncertainty over which site is the - * rightful master, and only the loser gets the DUPMASTER return - * code. - */ - if ((ret = __repmgr_become_client(env)) == 0 && + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) { + /* + * The preferred master site must restart as a master + * so that it sends out a NEWMASTER to help the client + * sync. It must force a role change so that it + * advances its gen even though it is already master. + * This is needed if there was a temporary master at + * a higher gen that is now restarting as a client. + * A client won't process messages from a master at + * a lower gen than its own. + */ + ret = __repmgr_repstart(env, DB_REP_MASTER, + REP_START_FORCE_ROLECHG); + } else if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) && + (ret = __repmgr_become_client(env)) == 0) { + /* + * The preferred master client site must restart as + * client without any elections to enable the preferred + * master site to preserve its own transactions. It + * uses an election thread to repeatedly perform client + * startups so that it will perform its client sync + * when the preferred master's gen has caught up. + */ + LOCK_MUTEX(db_rep->mutex); + ret = __repmgr_init_election(env, + ELECT_F_CLIENT_RESTART); + UNLOCK_MUTEX(db_rep->mutex); + } else if ((ret = __repmgr_become_client(env)) == 0 && FLD_ISSET(rep->config, REP_C_LEASE | REP_C_ELECTIONS) == REP_C_ELECTIONS) { + /* + * Initiate an election if we're configured to be using + * elections, but only if we're *NOT* using leases. + * When using leases, there is never any uncertainty + * over which site is the rightful master, and only the + * loser gets the DUPMASTER return code. + */ LOCK_MUTEX(db_rep->mutex); ret = __repmgr_init_election(env, ELECT_F_IMMED); UNLOCK_MUTEX(db_rep->mutex); @@ -406,6 +466,14 @@ DB_TEST_RECOVERY_LABEL t_ret = __op_rep_exit(env); if (ret == ENOENT) ret = 0; + else if (ret == DB_DELETED && db_rep->demotion_pending) + /* + * If a demotion is in progress, we want to keep + * the repmgr threads instead of bowing out because + * they are needed when we rejoin the replication group + * immediately as a view. + */ + ret = 0; else if (ret == DB_DELETED) ret = __repmgr_bow_out(env); if (t_ret != 0 && ret == 0) @@ -428,8 +496,10 @@ __repmgr_handle_event(env, event, info) void *info; { DB_REP *db_rep; + REP *rep; db_rep = env->rep_handle; + rep = db_rep->region; if (db_rep->selector == NULL) { /* Repmgr is not in use, so all events go to application. */ @@ -457,9 +527,46 @@ __repmgr_handle_event(env, event, info) /* Application still needs to see this. */ break; + case DB_EVENT_REP_MASTER: + case DB_EVENT_REP_STARTUPDONE: + /* + * Detect a rare case where a dupmaster or incomplete gmdb + * operation has left the site's gmdb inconsistent with + * a view callback definition. The user would have correctly + * defined a view callback and called repmgr_start(), but the + * gmdb operation to update this site to a view would have been + * incomplete or rolled back. The site cannot operate in this + * inconsistent state, so set an indicator to cause a message + * thread to panic and terminate. + * + * The one exception is during a demotion to view, when + * this inconsistency is expected for a short time. + */ + if (IS_VALID_EID(db_rep->self_eid) && + PARTICIPANT_TO_VIEW(db_rep, + SITE_FROM_EID(db_rep->self_eid)) && + !db_rep->demotion_pending) + db_rep->view_mismatch = TRUE; + + /* + * In preferred master mode, when the preferred master site + * finishes synchronizing with the temporary master it must + * prepare to take over as master. This is detected by the + * next heartbeat in a message thread, where the takeover is + * actually performed. + */ + if (event == DB_EVENT_REP_STARTUPDONE && + IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_MASTER)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "startupdone set preferred master switch")); + db_rep->prefmas_pending = master_switch; + } + break; default: break; } + COMPQUIET(info, NULL); return (DB_EVENT_NOT_HANDLED); } @@ -504,7 +611,7 @@ send_permlsn(env, generation, lsn) */ policy = site->ack_policy > 0 ? site->ack_policy : rep->perm_policy; - if (policy == DB_REPMGR_ACKS_NONE || + if (IS_VIEW_SITE(env) || policy == DB_REPMGR_ACKS_NONE || (IS_PEER_POLICY(policy) && rep->priority == 0)) ack = FALSE; else @@ -614,26 +721,149 @@ send_permlsn_conn(env, conn, generation, lsn) return (ret); } +/* + * Perform the steps on the preferred master site to take over again as + * preferred master from a temporary master. This routine should only be + * called after the preferred master has restarted as a client and finished + * a client sync with the temporary master. + * + * This routine makes a best effort to wait until all temporary master + * transactions have been applied on this site before taking over. + */ +static int +preferred_master_takeover(env) + ENV *env; +{ + DB_LOG *dblp; + DB_REP *db_rep; + LOG *lp; + REP *rep; + DB_LSN last_ready_lsn, ready_lsn, sync_lsn; + u_long usec; + u_int32_t gen, max_tries, tries; + int ret, synced; + + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + db_rep = env->rep_handle; + rep = db_rep->region; + gen = 0; + ZERO_LSN(sync_lsn); + ret = 0; + + if (!IS_PREFMAS_MODE(env)) + return (ret); + + /* + * Start by making the temporary master a readonly master so that we + * can know when we have applied all of its transactions on this + * site before taking over. + */ + if ((ret = __repmgr_make_site_readonly_master(env, + 1, &gen, &sync_lsn)) != 0) + return (ret); + DB_ASSERT(env, gen >= rep->gen); + + /* + * Make a best effort to wait until this site has all transactions + * from the temporary master. We want to preserve temporary master + * transactions, but we can't wait forever. If we exceed our wait, + * we restart this site as preferred master anyway. This may + * sacrifice some temporary master transactions in order to preserve + * repgroup write availability. + * + * We restart the number of tries each time we make progress in + * transactions applied, until either we apply through sync_lsn or + * we exceed max_tries without progress. + */ + if ((ret = __repmgr_prefmas_get_wait(env, &max_tries, &usec)) != 0) + return (ret); + tries = 0; + synced = 0; + ZERO_LSN(ready_lsn); + ZERO_LSN(last_ready_lsn); + while (!synced && tries < max_tries) { + __os_yield(env, 0, usec); + tries++; + /* + * lp->ready_lsn is the next LSN we expect to receive, + * which also indicates how much we've applied. sync_lsn + * is the lp->lsn (indicating the next log record expected) + * from the other site. + */ + MUTEX_LOCK(env, rep->mtx_clientdb); + ready_lsn = lp->ready_lsn; + MUTEX_UNLOCK(env, rep->mtx_clientdb); + if (gen == rep->gen && LOG_COMPARE(&ready_lsn, &sync_lsn) >= 0) + synced = 1; + else if (LOG_COMPARE(&ready_lsn, &last_ready_lsn) >= 0) { + /* We are making progress, restart number of tries. */ + last_ready_lsn = ready_lsn; + tries = 0; + } + } + + /* Restart the remote readonly temporary master as a client. */ + if ((ret = __repmgr_restart_site_as_client(env, 1)) != 0) + return (ret); + + /* Restart this site as the preferred master, waiting for + * REP_LOCKOUT_MSG. The NEWCLIENT message sent back from + * restarting the other site as client can briefly lock + * REP_LOCKOUT_MSG to do some cleanup. We don't want this + * to cause the rep_start_int() call to restart this site + * as master to return 0 without doing anything. + */ + ret = __repmgr_become_master(env, REP_START_WAIT_LOCKMSG); + return (ret); +} + static int serve_repmgr_request(env, msg) ENV *env; REPMGR_MESSAGE *msg; { - DB_THREAD_INFO *ip; + DB_REP *db_rep; DBT *dbt; + DB_THREAD_INFO *ip; REPMGR_CONNECTION *conn; + u_int32_t mtype; int ret, t_ret; - ENV_ENTER(env, ip); - switch (REPMGR_OWN_MSG_TYPE(msg->msg_hdr)) { + db_rep = env->rep_handle; + ENV_GET_THREAD_INFO(env, ip); + conn = msg->v.gmdb_msg.conn; + mtype = REPMGR_OWN_MSG_TYPE(msg->msg_hdr); + switch (mtype) { case REPMGR_JOIN_REQUEST: ret = serve_join_request(env, ip, msg); break; + case REPMGR_LSNHIST_REQUEST: + ret = serve_lsnhist_request(env, ip, msg); + break; + case REPMGR_READONLY_MASTER: + ret = serve_readonly_master_request(env, msg); + break; case REPMGR_REJOIN: RPRINT(env, (env, DB_VERB_REPMGR_MISC, "One try at rejoining group automatically")); if ((ret = __repmgr_join_group(env)) == DB_REP_UNAVAIL) ret = __repmgr_bow_out(env); + else if (ret == 0 && IS_PREFMAS_MODE(env)) { + /* + * For preferred master mode, we need to get + * a "regular" connection to the other site without + * calling an election prematurely here. + */ + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Establishing connections after rejoin")); + ret = rejoin_connections(env); + } else if (ret == 0 && db_rep->rejoin_pending) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Calling deferred election after rejoin")); + ret = rejoin_deferred_election(env); + } + db_rep->rejoin_pending = FALSE; break; case REPMGR_REMOVE_REQUEST: ret = serve_remove_request(env, ip, msg); @@ -641,23 +871,32 @@ serve_repmgr_request(env, msg) case REPMGR_RESOLVE_LIMBO: ret = resolve_limbo_wrapper(env, ip); break; + case REPMGR_RESTART_CLIENT: + ret = serve_restart_client_request(env, msg); + break; case REPMGR_SHARING: dbt = &msg->v.gmdb_msg.request; - ret = __repmgr_refresh_membership(env, dbt->data, dbt->size); + ret = __repmgr_refresh_membership(env, dbt->data, dbt->size, + (conn == NULL ? DB_REPMGR_VERSION : conn->version)); break; default: ret = __db_unknown_path(env, "serve_repmgr_request"); break; } - if ((conn = msg->v.gmdb_msg.conn) != NULL) { + if (conn != NULL) { + /* + * A site that removed itself may have already closed its + * connections. Do not return an error and panic if we + * can't close the one-shot GMDB connection for a remove + * request here. + */ if ((t_ret = __repmgr_close_connection(env, conn)) != 0 && - ret == 0) + ret == 0 && mtype != REPMGR_REMOVE_REQUEST) ret = t_ret; if ((t_ret = __repmgr_decr_conn_ref(env, conn)) != 0 && ret == 0) ret = t_ret; } - ENV_LEAVE(env, ip); return (ret); } @@ -674,8 +913,10 @@ serve_join_request(env, ip, msg) { DB_REP *db_rep; REPMGR_CONNECTION *conn; + REPMGR_SITE *site; DBT *dbt; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; u_int8_t *buf; char *host; size_t len; @@ -686,9 +927,18 @@ serve_join_request(env, ip, msg) COMPQUIET(status, 0); conn = msg->v.gmdb_msg.conn; + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); dbt = &msg->v.gmdb_msg.request; - ret = __repmgr_site_info_unmarshal(env, - &site_info, dbt->data, dbt->size, NULL); + if (conn->version < 5) { + ret = __repmgr_v4site_info_unmarshal(env, + &v4site_info, dbt->data, dbt->size, NULL); + site_info.host = v4site_info.host; + site_info.port = v4site_info.port; + site_info.status = v4site_info.flags; + site_info.flags = 0; + } else + ret = __repmgr_site_info_unmarshal(env, + &site_info, dbt->data, dbt->size, NULL); host = site_info.host.data; host[site_info.host.size - 1] = '\0'; @@ -703,7 +953,23 @@ serve_join_request(env, ip, msg) LOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_find_site(env, host, site_info.port, &eid)) == 0) { DB_ASSERT(env, eid != db_rep->self_eid); - status = SITE_FROM_EID(eid)->membership; + site = SITE_FROM_EID(eid); + status = site->membership; + /* + * Remote site electability is usually exchanged when + * a connection is established, but when a new site + * joins the repgroup there is a brief gap between the + * join and the connection. Record electability for + * the joining site so that we are not overly conservative + * about the number of acks we require for a PERM + * transaction if the joining site is unelectable. + */ + if (FLD_ISSET(site_info.flags, SITE_JOIN_ELECTABLE)) { + F_SET(site, SITE_ELECTABLE); + FLD_CLR(site_info.flags, SITE_JOIN_ELECTABLE); + } else + F_CLR(site, SITE_ELECTABLE); + F_SET(site, SITE_HAS_PRIO); } UNLOCK_MUTEX(db_rep->mutex); if (ret != 0) @@ -712,7 +978,8 @@ serve_join_request(env, ip, msg) switch (status) { case 0: case SITE_ADDING: - ret = __repmgr_update_membership(env, ip, eid, SITE_ADDING); + ret = __repmgr_update_membership(env, ip, eid, SITE_ADDING, + site_info.flags); break; case SITE_PRESENT: /* Already in desired state. */ @@ -729,7 +996,7 @@ serve_join_request(env, ip, msg) goto err; LOCK_MUTEX(db_rep->mutex); - ret = __repmgr_marshal_member_list(env, &buf, &len); + ret = __repmgr_marshal_member_list(env, conn->version, &buf, &len); UNLOCK_MUTEX(db_rep->mutex); if (ret != 0) goto err; @@ -760,6 +1027,7 @@ serve_remove_request(env, ip, msg) REPMGR_SITE *site; DBT *dbt; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; char *host; u_int32_t status, type; int eid, ret, t_ret; @@ -768,9 +1036,18 @@ serve_remove_request(env, ip, msg) db_rep = env->rep_handle; conn = msg->v.gmdb_msg.conn; + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); dbt = &msg->v.gmdb_msg.request; - ret = __repmgr_site_info_unmarshal(env, - &site_info, dbt->data, dbt->size, NULL); + if (conn->version < 5) { + ret = __repmgr_v4site_info_unmarshal(env, + &v4site_info, dbt->data, dbt->size, NULL); + site_info.host = v4site_info.host; + site_info.port = v4site_info.port; + site_info.status = v4site_info.flags; + site_info.flags = 0; + } else + ret = __repmgr_site_info_unmarshal(env, + &site_info, dbt->data, dbt->size, NULL); host = site_info.host.data; host[site_info.host.size - 1] = '\0'; @@ -810,7 +1087,8 @@ serve_remove_request(env, ip, msg) break; case SITE_PRESENT: case SITE_DELETING: - ret = __repmgr_update_membership(env, ip, eid, SITE_DELETING); + ret = __repmgr_update_membership(env, ip, eid, SITE_DELETING, + site_info.flags); break; default: ret = __db_unknown_path(env, "serve_remove_request"); @@ -829,7 +1107,175 @@ err: default: return (ret); } - return (__repmgr_send_sync_msg(env, conn, type, NULL, 0)); + /* + * It is possible when a site removes itself that by now it has + * already acted on the first GMDB update and closed its connections. + * Do not return an error and panic if we can't send the final + * status of the remove operation. + */ + if ((ret = __repmgr_send_sync_msg(env, conn, type, NULL, 0)) != 0) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Problem sending remove site status message %d", ret)); + return (0); +} + +/* + * Serve the REPMGR_RESTART_CLIENT message by restarting this site as a + * client if it is not already a client. Always sends back a + * REPMGR_PREFMAS_SUCCESS message with an empty payload. + */ +static int +serve_restart_client_request(env, msg) + ENV *env; + REPMGR_MESSAGE *msg; +{ + DB_REP *db_rep; + REP * rep; + REPMGR_CONNECTION *conn; + int ret, t_ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + ret = 0; + + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Serving restart_client request")); + conn = msg->v.gmdb_msg.conn; + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); + /* No need to read payload - it is just a dummy byte. */ + + if (IS_PREFMAS_MODE(env) && !F_ISSET(rep, REP_F_CLIENT)) + ret = __repmgr_become_client(env); + + if ((t_ret = __repmgr_send_sync_msg(env, conn, + REPMGR_PREFMAS_SUCCESS, NULL, 0)) != 0) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Problem sending restart client success message %d", ret)); + + if (ret == 0 && t_ret != 0) + ret = t_ret; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Request for restart_client returning %d", ret)); + return (ret); +} + +/* + * Serve the REPMGR_READONLY_MASTER message by turning this site into a + * readonly master. Always sends back a REPMGR_READONLY_RESPONSE message with + * a payload containing this site's gen and next LSN expected. If there are + * any errors, the gen is 0 and the next LSN is [0,0]. + */ +static int +serve_readonly_master_request(env, msg) + ENV *env; + REPMGR_MESSAGE *msg; +{ + REPMGR_CONNECTION *conn; + __repmgr_permlsn_args permlsn; + u_int8_t buf[__REPMGR_PERMLSN_SIZE]; + int ret, t_ret; + + ret = 0; + + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Serving readonly_master request")); + conn = msg->v.gmdb_msg.conn; + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); + /* No need to read payload - it is just a dummy byte. */ + + if (IS_PREFMAS_MODE(env)) + ret = __rep_become_readonly_master(env, + &permlsn.generation, &permlsn.lsn); + + __repmgr_permlsn_marshal(env, &permlsn, buf); + if ((t_ret = __repmgr_send_sync_msg(env, conn, + REPMGR_READONLY_RESPONSE, buf, __REPMGR_PERMLSN_SIZE)) != 0) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Problem sending readonly response message %d", ret)); + if (ret == 0 && t_ret != 0) + ret = t_ret; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Request for readonly_master returning %d", ret)); + return (ret); +} + +/* + * Serve the REPMGR_LSNHIST_REQUEST message by retrieving information from + * this site's LSN history database for the requested gen. If the requested + * gen exists at this site, sends back a REPMGR_LSNHIST_RESPONSE message + * containing the LSN and timestamp at the requested gen and the LSN for the + * next gen if that gen exists (next gen LSN is [0,0] if next gen doesn't + * yet exist at this site.) Sends back a PREFMAS_FAILURE message if the + * requested gen does not yet exist at this site or if there are any errors. + */ +static int +serve_lsnhist_request(env, ip, msg) + ENV *env; + DB_THREAD_INFO *ip; + REPMGR_MESSAGE *msg; +{ + REPMGR_CONNECTION *conn; + DBT *dbt; + __repmgr_lsnhist_match_args lsnhist_match; + __rep_lsn_hist_data_args lsnhist_data, next_lsnhist_data; + __rep_lsn_hist_key_args key; + u_int8_t match_buf[__REPMGR_LSNHIST_MATCH_SIZE]; + DB_LSN next_gen_lsn; + int ret, t_ret; + + RPRINT(env, (env, DB_VERB_REPMGR_MISC, "Serving lsnhist request")); + conn = msg->v.gmdb_msg.conn; + DB_ASSERT(env, conn->version > 0 && conn->version <= DB_REPMGR_VERSION); + /* Read lsn_hist_key incoming payload to get gen being requested. */ + dbt = &msg->v.gmdb_msg.request; + if ((ret = __rep_lsn_hist_key_unmarshal(env, + &key, dbt->data, dbt->size, NULL)) != 0) + return (ret); + if (key.version != REP_LSN_HISTORY_FMT_VERSION) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "serve_lsnhist_request version mismatch")); + return (0); + } + + /* + * There's no need to retry if we don't find an lsnhist record for + * requested gen. This site is either a temporary master or a client, + * which means that if it doesn't already have an lsnhist record at + * this gen, it is highly unlikely to get one in the near future. + */ + if ((ret = __rep_get_lsnhist_data(env, + ip, key.gen, &lsnhist_data)) == 0) { + + if ((t_ret = __rep_get_lsnhist_data(env, + ip, key.gen + 1, &next_lsnhist_data)) == 0) + next_gen_lsn = next_lsnhist_data.lsn; + else + ZERO_LSN(next_gen_lsn); + + lsnhist_match.lsn = lsnhist_data.lsn; + lsnhist_match.hist_sec = lsnhist_data.hist_sec; + lsnhist_match.hist_nsec = lsnhist_data.hist_nsec; + lsnhist_match.next_gen_lsn = next_gen_lsn; + __repmgr_lsnhist_match_marshal(env, &lsnhist_match, match_buf); + if ((t_ret = __repmgr_send_sync_msg(env, conn, + REPMGR_LSNHIST_RESPONSE, match_buf, + __REPMGR_LSNHIST_MATCH_SIZE)) != 0) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Problem sending lsnhist response message %d", + ret)); + } else if ((t_ret = __repmgr_send_sync_msg(env, conn, + REPMGR_PREFMAS_FAILURE, NULL, 0)) != 0) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Problem sending prefmas failure message %d", ret)); + + /* Do not return an error if LSN history record not found. */ + if (ret == DB_NOTFOUND) + ret = 0; + if (ret == 0 && t_ret != 0) + ret = t_ret; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Request for lsnhist returning %d", ret)); + return (ret); } /* @@ -917,7 +1363,13 @@ resolve_limbo_int(env, ip) if (orig_status == SITE_PRESENT || orig_status == 0) goto out; - if (IS_ZERO_LSN(db_rep->limbo_failure)) + /* + * It is possible after an autotakeover on a master to have no + * limbo_failure LSN but to have a limbo_victim that was found + * in the gmdb that still needs to be resolved. + */ + if (IS_ZERO_LSN(db_rep->limbo_failure) && + !db_rep->limbo_resolution_needed) goto out; /* @@ -947,7 +1399,8 @@ resolve_limbo_int(env, ip) ip, NULL, &txn, DB_IGNORE_LEASE)) != 0) goto out; - marshal_site_data(env, orig_status, data_buf, &data_dbt); + marshal_site_data(env, + orig_status, site->gmdb_flags, data_buf, &data_dbt); ret = __db_put(db_rep->gmdb, ip, txn, &key_dbt, &data_dbt, 0); if ((t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 && @@ -980,15 +1433,15 @@ resolve_limbo_int(env, ip) UNLOCK_MUTEX(db_rep->mutex); locked = FALSE; status = NEXT_STATUS(orig_status); - if ((ret = finish_gmdb_update(env, - ip, &key_dbt, orig_status, status, &logrec)) != 0) + if ((ret = finish_gmdb_update(env, ip, + &key_dbt, orig_status, status, site->gmdb_flags, &logrec)) != 0) goto out; /* Track modified membership status in our in-memory sites array. */ LOCK_MUTEX(db_rep->mutex); locked = TRUE; if ((ret = __repmgr_set_membership(env, - addr.host, addr.port, status)) != 0) + addr.host, addr.port, status, site->gmdb_flags)) != 0) goto out; __repmgr_set_sites(env); @@ -1005,14 +1458,15 @@ out: * status is inferred (ADDING -> PRESENT, or DELETING -> 0). * * PUBLIC: int __repmgr_update_membership __P((ENV *, - * PUBLIC: DB_THREAD_INFO *, int, u_int32_t)); + * PUBLIC: DB_THREAD_INFO *, int, u_int32_t, u_int32_t)); */ int -__repmgr_update_membership(env, ip, eid, pstatus) +__repmgr_update_membership(env, ip, eid, pstatus, site_flags) ENV *env; DB_THREAD_INFO *ip; int eid; u_int32_t pstatus; /* Provisional status. */ + u_int32_t site_flags; { DB_REP *db_rep; REPMGR_SITE *site; @@ -1092,7 +1546,7 @@ retry: * those seem even more confusing. */ if ((ret = __repmgr_set_membership(env, - addr.host, addr.port, pstatus)) != 0) + addr.host, addr.port, pstatus, site_flags)) != 0) goto err; __repmgr_set_sites(env); @@ -1108,7 +1562,7 @@ retry: if ((ret = __txn_begin(env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0) goto err; marshal_site_key(env, &addr, key_buf, &key_dbt, &logrec); - marshal_site_data(env, pstatus, status_buf, &data_dbt); + marshal_site_data(env, pstatus, site_flags, status_buf, &data_dbt); if ((ret = __db_put(db_rep->gmdb, ip, txn, &key_dbt, &data_dbt, 0)) != 0) goto err; @@ -1152,13 +1606,14 @@ retry: locked = FALSE; if ((ret = finish_gmdb_update(env, ip, - &key_dbt, pstatus, ult_status, &logrec)) != 0) + &key_dbt, pstatus, ult_status, site_flags, &logrec)) != 0) goto err; /* Track modified membership status in our in-memory sites array. */ LOCK_MUTEX(db_rep->mutex); locked = TRUE; - ret = __repmgr_set_membership(env, addr.host, addr.port, ult_status); + ret = __repmgr_set_membership(env, addr.host, addr.port, + ult_status, site_flags); __repmgr_set_sites(env); err: @@ -1173,7 +1628,7 @@ err: * that we keep in sync. */ (void)__repmgr_set_membership(env, - addr.host, addr.port, orig_status); + addr.host, addr.port, orig_status, site_flags); } if ((t_ret = __repmgr_cleanup_gmdb_op(env, do_close)) != 0 && ret == 0) @@ -1215,13 +1670,14 @@ retry: UNLOCK_MUTEX(db_rep->mutex); marshal_site_key(env, &addr, key_buf, &key_dbt, &logrec); - if ((ret = finish_gmdb_update(env, - ip, &key_dbt, cur_status, new_status, &logrec)) != 0) + if ((ret = finish_gmdb_update(env, ip, + &key_dbt, cur_status, new_status, site->gmdb_flags, &logrec)) != 0) goto err; /* Track modified membership status in our in-memory sites array. */ LOCK_MUTEX(db_rep->mutex); - ret = __repmgr_set_membership(env, addr.host, addr.port, new_status); + ret = __repmgr_set_membership(env, addr.host, addr.port, + new_status, site->gmdb_flags); __repmgr_set_sites(env); UNLOCK_MUTEX(db_rep->mutex); @@ -1301,11 +1757,11 @@ __repmgr_set_gm_version(env, ip, txn, version) * really deleted. */ static int -finish_gmdb_update(env, ip, key_dbt, prev_status, status, logrec) +finish_gmdb_update(env, ip, key_dbt, prev_status, status, flags, logrec) ENV *env; DB_THREAD_INFO *ip; DBT *key_dbt; - u_int32_t prev_status, status; + u_int32_t prev_status, status, flags; __repmgr_member_args *logrec; { DB_REP *db_rep; @@ -1324,7 +1780,7 @@ finish_gmdb_update(env, ip, key_dbt, prev_status, status, logrec) if (status == 0) ret = __db_del(db_rep->gmdb, ip, txn, key_dbt, 0); else { - marshal_site_data(env, status, data_buf, &data_dbt); + marshal_site_data(env, status, flags, data_buf, &data_dbt); ret = __db_put(db_rep->gmdb, ip, txn, key_dbt, &data_dbt, 0); } if (ret != 0) @@ -1617,16 +2073,18 @@ marshal_site_key(env, addr, buf, dbt, logrec) } static void -marshal_site_data(env, status, buf, dbt) +marshal_site_data(env, status, flags, buf, dbt) ENV *env; u_int32_t status; + u_int32_t flags; u_int8_t *buf; DBT *dbt; { - __repmgr_membership_data_args member_status; + __repmgr_membership_data_args member_data; - member_status.flags = status; - __repmgr_membership_data_marshal(env, &member_status, buf); + member_data.status = status; + member_data.flags = flags; + __repmgr_membership_data_marshal(env, &member_data, buf); DB_INIT_DBT(*dbt, buf, __REPMGR_MEMBERSHIP_DATA_SIZE); } @@ -1640,16 +2098,107 @@ __repmgr_set_sites(env) ENV *env; { DB_REP *db_rep; + REP *rep; int ret; u_int32_t n; u_int i; db_rep = env->rep_handle; + rep = db_rep->region; for (i = 0, n = 0; i < db_rep->site_cnt; i++) { - if (db_rep->sites[i].membership > 0) + /* + * Views do not count towards nsites because they cannot + * vote in elections, become master or contribute to + * durability. + */ + if (db_rep->sites[i].membership > 0 && + !FLD_ISSET(db_rep->sites[i].gmdb_flags, SITE_VIEW)) n++; } ret = __rep_set_nsites_int(env, n); DB_ASSERT(env, ret == 0); + if (FLD_ISSET(rep->config, + REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT) && + rep->config_nsites > 2) + __db_errx(env, DB_STR("3701", + "More than two sites in preferred master replication group")); +} + +/* + * If a site is rejoining a 2-site repgroup with 2SITE_STRICT off + * and has a rejection because it needs to catch up with the latest + * group membership database, it cannot call an election right away + * because it would win with only its own vote and ignore an existing + * master in the repgroup. Instead, this routine is used to call the + * deferred election after the site has rejoined the repgroup successfully. + */ +static int +rejoin_deferred_election(env) + ENV *env; +{ + DB_REP *db_rep; + u_int32_t flags; + int eid, ret; + + db_rep = env->rep_handle; + LOCK_MUTEX(db_rep->mutex); + + /* + * First, retry all connections so that the election can communicate + * with the other sites. Normally there should only be one other + * site in the repgroup, but it is safest to retry all remote sites + * found in case the group membership changed while we were gone. + */ + FOR_EACH_REMOTE_SITE_INDEX(eid) { + if ((ret = + __repmgr_schedule_connection_attempt(env, eid, TRUE)) != 0) + break; + } + + /* + * Call an immediate, but not a fast, election because a fast + * election reduces the number of votes needed by 1. + */ + flags = ELECT_F_EVENT_NOTIFY; + if (FLD_ISSET(db_rep->region->config, REP_C_ELECTIONS)) + LF_SET(ELECT_F_IMMED); + else + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Deferred rejoin election, but no elections")); + ret = __repmgr_init_election(env, flags); + + UNLOCK_MUTEX(db_rep->mutex); + return (ret); +} +/* + * If a site is rejoining a preferred master replication group and has a + * rejection because it needs to catch up with the latest group membership + * database, it needs to establish its "regular" connection to the other site + * so that it can proceed through the preferred master startup sequence. + */ +static int +rejoin_connections(env) + ENV *env; +{ + DB_REP *db_rep; + int eid, ret; + + db_rep = env->rep_handle; + ret = 0; + LOCK_MUTEX(db_rep->mutex); + + /* + * Retry all connections. Normally there should only be one other + * site in the repgroup, but it is safest to retry all remote sites + * found in case the group membership changed while we were gone. + */ + FOR_EACH_REMOTE_SITE_INDEX(eid) { + if ((ret = + __repmgr_schedule_connection_attempt(env, eid, TRUE)) != 0) + break; + } + + UNLOCK_MUTEX(db_rep->mutex); + return (ret); } diff --git a/src/repmgr/repmgr_net.c b/src/repmgr/repmgr_net.c index 54e3d066..334fd150 100644 --- a/src/repmgr/repmgr_net.c +++ b/src/repmgr/repmgr_net.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -57,6 +57,7 @@ struct sending_msg { * whether the PERM message should be considered durable. */ struct repmgr_permanence { + u_int32_t gen; /* Master generation for LSN. */ DB_LSN lsn; /* LSN whose ack this thread is waiting for. */ u_int threshold; /* Number of client acks to wait for. */ u_int quorum; /* Durability threshold for QUORUM policy. */ @@ -378,7 +379,7 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags) goto out; #undef SEND_ONE_CONNECTION - nsites_sent = 1; + nsites_sent = FLD_ISSET(site->gmdb_flags, SITE_VIEW) ? 0 : 1; npeers_sent = F_ISSET(site, SITE_ELECTABLE) ? 1 : 0; missed_peer = FALSE; } @@ -418,7 +419,13 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags) nclients = 0; else if ((policy == DB_REPMGR_ACKS_ONE || policy == DB_REPMGR_ACKS_ONE_PEER) && - nclients == 1) { + nclients < 2) { + /* + * Adjust to QUORUM when first other + * participant joins (nclients=1) or when there + * are no other participants but a view joins + * (nclients=0) to get enough acks. + */ nclients = 0; policy = DB_REPMGR_ACKS_QUORUM; } @@ -498,9 +505,16 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags) if (nclients > 1 || FLD_ISSET(db_rep->region->config, REP_C_2SITE_STRICT) || - db_rep->active_gmdb_update == gmdb_primary) + db_rep->active_gmdb_update == gmdb_primary) { quorum = nclients / 2; - else + /* + * An unelectable master can't be part of the + * QUORUM policy quorum. + */ + if (rep->priority == 0 && + policy == DB_REPMGR_ACKS_QUORUM) + quorum++; + } else quorum = nclients; if (policy == DB_REPMGR_ACKS_ALL_AVAILABLE) { @@ -560,6 +574,7 @@ __repmgr_send(dbenv, control, rec, lsnp, eid, flags) /* In ALL_PEERS case, display of "needed" might be confusing. */ VPRINT(env, (env, DB_VERB_REPMGR_MISC, "will await acknowledgement: need %u", needed)); + perm.gen = rep->gen; perm.lsn = *lsnp; perm.threshold = needed; perm.policy = policy; @@ -734,8 +749,13 @@ __repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp, missingp) * useful to keep letting a removed site see updates so that it * learns of its own removal, and will know to rejoin at its * next reboot. + * + * We never count sends to views because views cannot + * contribute to durability, but we always do the sends. */ - if (site->membership == SITE_PRESENT) + if (FLD_ISSET(site->gmdb_flags, SITE_VIEW)) + full_member = FALSE; + else if (site->membership == SITE_PRESENT) full_member = TRUE; else { full_member = FALSE; @@ -802,7 +822,9 @@ send_connection(env, type, conn, msg, sent) REPMGR_MAX_V1_MSG_TYPE, REPMGR_MAX_V2_MSG_TYPE, REPMGR_MAX_V3_MSG_TYPE, - REPMGR_MAX_V4_MSG_TYPE + REPMGR_MAX_V4_MSG_TYPE, + REPMGR_MAX_V5_MSG_TYPE, + REPMGR_MAX_V6_MSG_TYPE }; db_rep = env->rep_handle; @@ -1132,18 +1154,24 @@ got_acks(env, context) has_unacked_peer = FALSE; FOR_EACH_REMOTE_SITE_INDEX(eid) { site = SITE_FROM_EID(eid); - if (site->membership != SITE_PRESENT) + /* + * Do not count an ack from a view because a view cannot + * contribute to durability. + */ + if (FLD_ISSET(site->gmdb_flags, SITE_VIEW)) continue; if (!F_ISSET(site, SITE_HAS_PRIO)) { /* - * Never connected to this site: since we can't know - * whether it's a peer, assume the worst. + * We have not reconnected to this site since the last + * recovery. Since we don't yet know whether it's a + * peer, assume the worst. */ has_unacked_peer = TRUE; continue; } - if (LOG_COMPARE(&site->max_ack, &perm->lsn) >= 0) { + if (site->max_ack_gen == perm->gen && + LOG_COMPARE(&site->max_ack, &perm->lsn) >= 0) { sites_acked++; if (F_ISSET(site, SITE_ELECTABLE)) peers_acked++; @@ -1206,6 +1234,7 @@ __repmgr_bust_connection(env, conn) DB_REP *db_rep; REP *rep; REPMGR_SITE *site; + db_timespec now; u_int32_t flags; int ret, eid; @@ -1259,7 +1288,9 @@ __repmgr_bust_connection(env, conn) } else /* Subordinate connection. */ goto out; - if ((ret = __repmgr_schedule_connection_attempt(env, eid, FALSE)) != 0) + /* Defer connection attempt if rejoining 2SITE_STRICT=off repgroup. */ + if (!db_rep->rejoin_pending && + (ret = __repmgr_schedule_connection_attempt(env, eid, FALSE)) != 0) goto out; /* @@ -1267,11 +1298,47 @@ __repmgr_bust_connection(env, conn) * master, assume that the master may have failed, and call for * an election. But only do this for the connection to the main * master process, not a subordinate one. And only do it if - * we're our site's main process, not a subordinate one. And + * we're our site's listener process, not a subordinate one. And * skip it if the application has configured us not to do * elections. */ if (!IS_SUBORDINATE(db_rep) && eid == rep->master_id) { + if (FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER)) { + /* + * When the connection is from master's listener, if + * there is any other connection from a master's + * subordinate process that could take over as + * listener, we delay the election to allow some time + * for a new master listener to start. At the end of + * the delay, if there is still no master listener, + * call an election. There is a slight chance that + * we will delay the election to wait for an inactive + * connection which would never become the next main + * connection. + */ + TAILQ_FOREACH(conn, &site->sub_conns, entries) { + if (conn->auto_takeover) { + if (!timespecisset( + &db_rep->m_listener_chk)) { + __os_gettime(env, &now, 1); + TIMESPEC_ADD_DB_TIMEOUT(&now, + db_rep->m_listener_wait); + db_rep->m_listener_chk = now; + } + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Master failure, but delay elections for takeover on master")); + return (0); + } + } + } + + /* Defer election if rejoining 2SITE_STRICT=off repgroup. */ + if (db_rep->rejoin_pending) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Deferring election after rejoin rejection")); + goto out; + } + /* * Even if we're not doing elections, defer the event * notification to later execution in the election @@ -1285,6 +1352,17 @@ __repmgr_bust_connection(env, conn) RPRINT(env, (env, DB_VERB_REPMGR_MISC, "Master failure, but no elections")); + /* + * In preferred master mode, a client that has lost its + * connection to the master uses an election thread to + * restart as master. + */ + if (IS_PREFMAS_MODE(env)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, +"bust_connection setting preferred master temp master")); + db_rep->prefmas_pending = start_temp_master; + } + if ((ret = __repmgr_init_election(env, flags)) != 0) goto out; } @@ -1340,25 +1418,59 @@ __repmgr_disable_connection(env, conn) REPMGR_CONNECTION *conn; { DB_REP *db_rep; - REPMGR_SITE *site; + REP *rep; REPMGR_RESPONSE *resp; + REPMGR_SITE *site; + SITEINFO *sites; u_int32_t i; - int eid, ret, t_ret; + int eid, is_subord, orig_state, ret, t_ret; db_rep = env->rep_handle; + rep = db_rep->region; ret = 0; + is_subord = 0; + orig_state = conn->state; conn->state = CONN_DEFUNCT; if (conn->type == REP_CONNECTION) { eid = conn->eid; if (IS_VALID_EID(eid)) { site = SITE_FROM_EID(eid); if (conn != site->ref.conn.in && - conn != site->ref.conn.out) - /* It's a subordinate connection. */ + conn != site->ref.conn.out) { + /* + * It is a subordinate connection to disable. + * Remove it from the subordinate connection + * list, and decrease the number of listener + * candidates by 1 if it is from a subordinate + * rep-aware process that allows takeover. + */ TAILQ_REMOVE(&site->sub_conns, conn, entries); + SET_LISTENER_CAND(conn->auto_takeover, --); + is_subord = 1; + } TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries); conn->ref_count++; + /* + * Do not decrease sites_avail for a subordinate + * connection. + */ + if (site->state == SITE_CONNECTED && !is_subord && + (orig_state == CONN_READY || + orig_state == CONN_CONGESTED)) { + /* + * Some thread orderings can cause a brief + * dip into a negative sites_avail value. + * Once it goes negative it stays negative, + * so avoid this. Future connections will + * be counted correctly. + */ + if (rep->sites_avail > 0) + rep->sites_avail--; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "disable_conn: EID %lu disabled. sites_avail %lu", + (u_long)eid, (u_long)rep->sites_avail)); + } } conn->eid = -1; } else if (conn->type == APP_CONNECTION) { @@ -1646,8 +1758,10 @@ flatten(env, msg) } /* - * Scan the list of remote sites, returning the first one that is a peer, - * is not the current master, and is available. + * Scan the list of remote sites, returning the first participant that is a + * peer, is not the current master, and is available. If there are no + * available participant peers but there is an available view peer, return the + * first available view peer. */ static REPMGR_SITE * __repmgr_find_available_peer(env) @@ -1656,23 +1770,28 @@ __repmgr_find_available_peer(env) DB_REP *db_rep; REP *rep; REPMGR_CONNECTION *conn; - REPMGR_SITE *site; - u_int i; + REPMGR_SITE *site, *view; + u_int avail, i; db_rep = env->rep_handle; rep = db_rep->region; + view = NULL; FOR_EACH_REMOTE_SITE_INDEX(i) { site = &db_rep->sites[i]; - if (FLD_ISSET(site->config, DB_REPMGR_PEER) && - EID_FROM_SITE(site) != rep->master_id && - site->state == SITE_CONNECTED && + avail = (site->state == SITE_CONNECTED && (((conn = site->ref.conn.in) != NULL && conn->state == CONN_READY) || ((conn = site->ref.conn.out) != NULL && - conn->state == CONN_READY))) + conn->state == CONN_READY))); + if (FLD_ISSET(site->config, DB_REPMGR_PEER) && + !FLD_ISSET(site->gmdb_flags, SITE_VIEW) && + EID_FROM_SITE(site) != rep->master_id && avail) return (site); + if (!view && FLD_ISSET(site->config, DB_REPMGR_PEER) && + FLD_ISSET(site->gmdb_flags, SITE_VIEW) && avail) + view = site; } - return (NULL); + return (view); } /* @@ -1852,6 +1971,7 @@ __repmgr_net_close(env) site->ref.conn.out = NULL; } } + rep->sites_avail = 0; if (db_rep->listen_fd != INVALID_SOCKET) { if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0) @@ -1870,22 +1990,28 @@ final_cleanup(env, conn, unused) void *unused; { DB_REP *db_rep; + REP *rep; REPMGR_SITE *site; - int ret, t_ret; + SITEINFO *sites; + int eid, ret, t_ret; COMPQUIET(unused, NULL); db_rep = env->rep_handle; + rep = db_rep->region; + eid = conn->eid; ret = __repmgr_close_connection(env, conn); /* Remove the connection from whatever list it's on, if any. */ - if (conn->type == REP_CONNECTION && IS_VALID_EID(conn->eid)) { - site = SITE_FROM_EID(conn->eid); + if (conn->type == REP_CONNECTION && IS_VALID_EID(eid)) { + site = SITE_FROM_EID(eid); if (site->state == SITE_CONNECTED && (conn == site->ref.conn.in || conn == site->ref.conn.out)) { /* Not on any list, so no need to do anything. */ - } else + } else { TAILQ_REMOVE(&site->sub_conns, conn, entries); + SET_LISTENER_CAND(conn->auto_takeover, --); + } t_ret = __repmgr_destroy_conn(env, conn); } else { diff --git a/src/repmgr/repmgr_posix.c b/src/repmgr/repmgr_posix.c index 0687681a..c49017ff 100644 --- a/src/repmgr/repmgr_posix.c +++ b/src/repmgr/repmgr_posix.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ diff --git a/src/repmgr/repmgr_queue.c b/src/repmgr/repmgr_queue.c index 6a381acf..3a51b32b 100644 --- a/src/repmgr/repmgr_queue.c +++ b/src/repmgr/repmgr_queue.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -22,13 +22,28 @@ __repmgr_queue_destroy(env) ENV *env; { DB_REP *db_rep; + REP *rep; REPMGR_MESSAGE *m; REPMGR_CONNECTION *conn; + u_int32_t mtype; int ret, t_ret; + COMPQUIET(mtype, 0); + db_rep = env->rep_handle; + rep = db_rep->region; ret = 0; + + /* + * Turn on the DB_EVENT_REP_INQUEUE_FULL event firing. We only do + * this for the main listener process. For a subordinate process, + * it is always turned on. + */ + if (!STAILQ_EMPTY(&db_rep->input_queue.header) && + !IS_SUBORDINATE(db_rep)) + rep->inqueue_full_event_on = 1; + while (!STAILQ_EMPTY(&db_rep->input_queue.header)) { m = STAILQ_FIRST(&db_rep->input_queue.header); STAILQ_REMOVE_HEAD(&db_rep->input_queue.header, entries); @@ -38,8 +53,25 @@ __repmgr_queue_destroy(env) ret == 0) ret = t_ret; } + if (m->msg_hdr.type == REPMGR_OWN_MSG) { + mtype = REPMGR_OWN_MSG_TYPE(m->msg_hdr); + if ((conn = m->v.gmdb_msg.conn) != NULL) { + /* + * A site that removed itself may have already + * closed its connections. + */ + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret == 0 && + mtype != REPMGR_REMOVE_REQUEST) + ret = t_ret; + if ((t_ret = __repmgr_decr_conn_ref(env, + conn)) != 0 && ret == 0) + ret = t_ret; + } + } __os_free(env, m); } + return (ret); } @@ -60,14 +92,17 @@ __repmgr_queue_get(env, msgp, th) REPMGR_RUNNABLE *th; { DB_REP *db_rep; + REP *rep; REPMGR_MESSAGE *m; #ifdef DB_WIN32 HANDLE wait_events[2]; #endif + u_int32_t msgsize; int ret; ret = 0; db_rep = env->rep_handle; + rep = db_rep->region; while ((m = available_work(env)) == NULL && db_rep->repmgr_status == running && !th->quit_requested) { @@ -104,10 +139,42 @@ __repmgr_queue_get(env, msgp, th) else { STAILQ_REMOVE(&db_rep->input_queue.header, m, __repmgr_message, entries); - db_rep->input_queue.size--; + msgsize = (u_int32_t)m->size; + while (msgsize >= GIGABYTE) { + DB_ASSERT(env, db_rep->input_queue.gbytes > 0); + db_rep->input_queue.gbytes--; + msgsize -= GIGABYTE; + } + if (db_rep->input_queue.bytes < msgsize) { + DB_ASSERT(env, db_rep->input_queue.gbytes > 0); + db_rep->input_queue.gbytes--; + db_rep->input_queue.bytes += GIGABYTE; + } + db_rep->input_queue.bytes -= msgsize; + + /* + * Check if current size is out of the red zone. + * If it is, we will turn on the DB_EVENT_REP_INQUEUE_FULL + * event firing. + * + * We only have the redzone machanism for the main listener + * process. + */ + if (!IS_SUBORDINATE(db_rep) && + rep->inqueue_full_event_on == 0) { + MUTEX_LOCK(env, rep->mtx_repmgr); + if (db_rep->input_queue.gbytes < + rep->inqueue_rz_gbytes || + (db_rep->input_queue.gbytes == + rep->inqueue_rz_gbytes && + db_rep->input_queue.bytes < + rep->inqueue_rz_bytes)) + rep->inqueue_full_event_on = 1; + MUTEX_UNLOCK(env, rep->mtx_repmgr); + } + *msgp = m; } - err: return (ret); } @@ -157,24 +224,55 @@ __repmgr_queue_put(env, msg) REPMGR_MESSAGE *msg; { DB_REP *db_rep; + REP *rep; + u_int32_t msgsize; db_rep = env->rep_handle; + rep = db_rep->region; + + /* + * Drop message if incoming queue contains more messages than the + * limit. See dbenv->repmgr_set_incoming_queue_max() for more + * information. + */ + MUTEX_LOCK(env, rep->mtx_repmgr); + if (db_rep->input_queue.gbytes > rep->inqueue_max_gbytes || + (db_rep->input_queue.gbytes == rep->inqueue_max_gbytes && + db_rep->input_queue.bytes >= rep->inqueue_max_bytes)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "incoming queue limit exceeded")); + STAT(rep->mstat.st_incoming_msgs_dropped++); + if (IS_SUBORDINATE(db_rep) || rep->inqueue_full_event_on) { + DB_EVENT(env, DB_EVENT_REP_INQUEUE_FULL, NULL); + /* + * We will always disable the event firing after + * the queue is full. It will be enabled again + * after the incoming queue size is out of the + * redzone. + * + * We only have the redzone machanism for the main + * listener process. + */ + if (!IS_SUBORDINATE(db_rep)) + rep->inqueue_full_event_on = 0; + } + MUTEX_UNLOCK(env, rep->mtx_repmgr); + __os_free(env, msg); + return (0); + } + MUTEX_UNLOCK(env, rep->mtx_repmgr); STAILQ_INSERT_TAIL(&db_rep->input_queue.header, msg, entries); - db_rep->input_queue.size++; + msgsize = (u_int32_t)msg->size; + while (msgsize >= GIGABYTE) { + msgsize -= GIGABYTE; + db_rep->input_queue.gbytes++; + } + db_rep->input_queue.bytes += msgsize; + if (db_rep->input_queue.bytes >= GIGABYTE) { + db_rep->input_queue.gbytes++; + db_rep->input_queue.bytes -= GIGABYTE; + } return (__repmgr_signal(&db_rep->msg_avail)); } - -/* - * PUBLIC: int __repmgr_queue_size __P((ENV *)); - * - * !!! - * Caller must hold repmgr->mutex. - */ -int -__repmgr_queue_size(env) - ENV *env; -{ - return (env->rep_handle->input_queue.size); -} diff --git a/src/repmgr/repmgr_rec.c b/src/repmgr/repmgr_rec.c index 41827aff..568df45d 100644 --- a/src/repmgr/repmgr_rec.c +++ b/src/repmgr/repmgr_rec.c @@ -1,3 +1,11 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2014, 2015 Oracle and/or its affiliates. All rights reserved. + * + * $Id$ + */ + #include "db_config.h" #include "db_int.h" @@ -31,7 +39,7 @@ __repmgr_member_recover(env, dbtp, lsnp, op, info) /* * The annotation log record describes the update in enough detail for - * us to be able to optimize our tracking of it at clients sites. + * us to be able to optimize our tracking of it at client sites. * However, for now we just simply reread the whole (small) database * each time, since changes happen so seldom (and we need to have the * code for reading the whole thing anyway, for other cases). diff --git a/src/repmgr/repmgr_sel.c b/src/repmgr/repmgr_sel.c index ba14368f..c32dad25 100644 --- a/src/repmgr/repmgr_sel.c +++ b/src/repmgr/repmgr_sel.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -12,7 +12,7 @@ typedef int (*HEARTBEAT_ACTION) __P((ENV *)); -static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); +static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *, int *)); static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); static void check_min_log_file __P((ENV *)); static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *)); @@ -23,13 +23,18 @@ static int process_parameters __P((ENV *, static int read_version_response __P((ENV *, REPMGR_CONNECTION *)); static int record_permlsn __P((ENV *, REPMGR_CONNECTION *)); static int __repmgr_call_election __P((ENV *)); +static int __repmgr_check_listener __P((ENV *)); +static int __repmgr_check_master_listener __P((ENV *)); static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *)); static void *__repmgr_connector_thread __P((void *)); static int __repmgr_next_timeout __P((ENV *, db_timespec *, HEARTBEAT_ACTION *)); +static int __repmgr_reset_last_rcvd __P((ENV *)); static int __repmgr_retry_connections __P((ENV *)); static int __repmgr_send_heartbeat __P((ENV *)); -static int __repmgr_try_one __P((ENV *, int)); +static int __repmgr_start_takeover __P((ENV *)); +static void *__repmgr_takeover_thread __P((void *)); +static int __repmgr_try_one __P((ENV *, int, int)); static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *)); static int send_version_response __P((ENV *, REPMGR_CONNECTION *)); @@ -49,17 +54,24 @@ void * __repmgr_select_thread(argsp) void *argsp; { - REPMGR_RUNNABLE *args; ENV *env; + DB_THREAD_INFO *ip; int ret; + REPMGR_RUNNABLE *args; args = argsp; env = args->env; + ip = NULL; + ret = 0; - if ((ret = __repmgr_select_loop(env)) != 0) { + ENV_ENTER_RET(env, ip, ret); + if (ret != 0 || (ret = __repmgr_select_loop(env)) != 0) { __db_err(env, ret, DB_STR("3614", "select loop failed")); + ENV_LEAVE(env, ip); (void)__repmgr_thread_failure(env, ret); } + if (ret == 0) + ENV_LEAVE(env, ip); return (NULL); } @@ -71,12 +83,19 @@ __repmgr_bow_out(env) ENV *env; { DB_REP *db_rep; + REP *rep; int ret; db_rep = env->rep_handle; + rep = db_rep->region; LOCK_MUTEX(db_rep->mutex); ret = __repmgr_stop_threads(env); UNLOCK_MUTEX(db_rep->mutex); + /* + * Reset sites_avail so that it will be calculated correctly if this + * site rejoins the group in the future. + */ + rep->sites_avail = 0; DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL); return (ret); } @@ -187,23 +206,53 @@ __repmgr_compute_timeout(env, timeout) db_rep = env->rep_handle; /* - * There are two factors to consider: are heartbeats in use? and, do we + * There are four factors to consider: are heartbeats in use? do we * have any sites with broken connections that we ought to retry? + * is there a listener process running locally? do we need to call + * an election if no master listener exists? */ have_timeout = __repmgr_next_timeout(env, &t, NULL); /* List items are in order, so we only have to examine the first one. */ if (!TAILQ_EMPTY(&db_rep->retries)) { retry = TAILQ_FIRST(&db_rep->retries); - if (have_timeout) { + if (have_timeout) /* Choose earliest timeout deadline. */ t = timespeccmp(&retry->time, &t, <) ? retry->time : t; - } else { + else { t = retry->time; have_timeout = TRUE; } } + /* Check listener every timeout in subordinate rep-aware process. */ + if (IS_LISTENER_CAND(db_rep)) { + if (!timespecisset(&db_rep->l_listener_chk)) { + __os_gettime(env, &now, 1); + TIMESPEC_ADD_DB_TIMEOUT(&now, db_rep->l_listener_wait); + db_rep->l_listener_chk = now; + } + if (have_timeout) + t = timespeccmp(&db_rep->l_listener_chk, &t, <) ? + db_rep->l_listener_chk : t; + else { + t = db_rep->l_listener_chk; + have_timeout = TRUE; + } + } + + /* Check master listener if needed. */ + if (FLD_ISSET(db_rep->region->config, REP_C_AUTOTAKEOVER) && + timespecisset(&db_rep->m_listener_chk)) { + if (have_timeout) + t = timespeccmp(&db_rep->m_listener_chk, &t, <) ? + db_rep->m_listener_chk : t; + else { + t = db_rep->m_listener_chk; + have_timeout = TRUE; + } + } + if (have_timeout) { __os_gettime(env, &now, 1); if (timespeccmp(&now, &t, >=)) @@ -242,7 +291,17 @@ __repmgr_next_timeout(env, deadline, action) if (rep->master_id == db_rep->self_eid && rep->heartbeat_frequency > 0) { - t = db_rep->last_bcast; + /* + * A temporary master in preferred master mode must send + * regular heartbeats regardless of other activity because + * the preferred master requires a heartbeat to take over as + * master after it has synced with the temporary master. + */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT)) + t = db_rep->last_hbeat; + else + t = db_rep->last_bcast; TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency); my_action = __repmgr_send_heartbeat; } else if ((master = __repmgr_connected_master(env)) != NULL && @@ -301,6 +360,24 @@ __repmgr_send_heartbeat(env) db_rep = env->rep_handle; rep = db_rep->region; + ret = 0; + + /* + * Check test hook preventing heartbeats and connection attempts. + * This is used to create and maintain a dupmaster condition in + * a test until the test hook is rescinded. + */ + DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT); + + /* + * Track last heartbeat for temporary master in preferred master + * mode so that it will send regular heartbeats regardless of + * other activity. + */ + if (IS_PREFMAS_MODE(env) && + FLD_ISSET(rep->config, REP_C_PREFMAS_CLIENT) && + rep->master_id == db_rep->self_eid) + __os_gettime(env, &db_rep->last_hbeat, 1); permlsn.generation = rep->gen; if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0) @@ -310,8 +387,11 @@ __repmgr_send_heartbeat(env) control.size = __REPMGR_PERMLSN_SIZE; DB_INIT_DBT(rec, NULL, 0); - return (__repmgr_send_broadcast(env, - REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3)); + ret =__repmgr_send_broadcast(env, + REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3); + +DB_TEST_RECOVERY_LABEL + return (ret); } /* @@ -373,6 +453,8 @@ __repmgr_check_timeouts(env) HEARTBEAT_ACTION action; int ret; + ret = 0; + /* * Figure out the next heartbeat-related thing to be done. Then, if * it's time to do it, do so. @@ -384,7 +466,342 @@ __repmgr_check_timeouts(env) return (ret); } - return (__repmgr_retry_connections(env)); + /* Check the existence of local listener. */ + if ((ret = __repmgr_check_listener(env)) != 0) + return (ret); + + /* Check the existence of master listener. */ + if ((ret = __repmgr_check_master_listener(env)) != 0) + return (ret); + + /* + * Check test hook preventing heartbeats and connection attempts. + * This is used to create and maintain a dupmaster condition in + * a test until the test hook is rescinded. + */ + DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT); + + ret = __repmgr_retry_connections(env); + +DB_TEST_RECOVERY_LABEL + return (ret); +} + +/* + * Check the existence of the listener process on the local site. If one + * does not exist and the current process is a subordinate rep-aware process, + * then start a takeover thread to covert this process to the listener process. + */ +static int +__repmgr_check_listener(env) + ENV *env; +{ + DB_REP *db_rep; + REP *rep; + SITEINFO *sites; + db_timespec t; + int ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + ret = 0; + + /* + * Only subordinate rep-aware process can take over listener role, so + * no need to check listener in listener process or rep unaware process. + */ + if (!IS_LISTENER_CAND(db_rep)) + return (0); + + /* + * If the listener quits due to site removal, no subordinate process + * should take over as listener as the current site is not expected + * to be active in the group. Check the status from the site array + * in the shared region instead of that in the GMDB. We do this + * because the GMDB doesn't apply the change yet when replication + * is stopped on the removed site. + */ + sites = R_ADDR(env->reginfo, rep->siteinfo_off); + if (sites[rep->self_eid].status == SITE_DELETING) + return (0); + + /* + * Check the listener after timeout. If there is no listener, we + * take over. During takeover, we will refresh all connections. + * A subordinate process does not have an up-to-date site list, so sync + * up addresses from the in-memory site array before takeover. + */ + __os_gettime(env, &t, 1); + if (timespeccmp(&t, &db_rep->l_listener_chk, >=)) { + /* Compute the next timeout. */ + TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->l_listener_wait); + db_rep->l_listener_chk = t; + + /* Check if site address information needs to be refreshed. */ + if ((rep->siteinfo_seq > db_rep->siteinfo_seq) && + (ret = __repmgr_sync_siteaddr(env)) != 0) + return (ret); + + if (rep->listener == 0) + ret = __repmgr_start_takeover(env); + } + return (ret); +} + +/* + * Start a thread to take over the listener role in the current subordinate + * process. + */ +static int +__repmgr_start_takeover(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_RUNNABLE *th; + int ret; + + db_rep = env->rep_handle; + th = db_rep->takeover_thread; + if (th == NULL) { + if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), + &th)) != 0) + return (ret); + db_rep->takeover_thread = th; + } else if (th->finished) { + if ((ret = __repmgr_thread_join(th)) != 0) + return (ret); + } else { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "takeover thread still running")); + return (0); + } + th->run = __repmgr_takeover_thread; + if ((ret = __repmgr_thread_start(env, th)) != 0) { + __os_free(env, th); + db_rep->takeover_thread = NULL; + } + return (ret); +} + +/* + * Take over listener role in the current subordinate process. + */ +static void * +__repmgr_takeover_thread(argsp) + void *argsp; +{ + DB_REP *db_rep; + DB_THREAD_INFO *ip; + ENV *env; + REP *rep; + REPMGR_RUNNABLE *th; + int nthreads, ret, save_policy; + + th = argsp; + env = th->env; + db_rep = env->rep_handle; + ip = NULL; + rep = db_rep->region; + ret = 0; + + ENV_ENTER_RET(env, ip, ret); + if (ret != 0) + goto out; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, "starting takeover thread")); + /* + * It is likely that there is an old heartbeat ready to expire + * immediately upon restarting repmgr, leading to an unnecessary + * election. Reset the expiration countdown here to avoid this. + */ + if ((ret = __repmgr_reset_last_rcvd(env)) != 0) + goto out; + /* + * If nthreads is set to be 0 in the current subordinate process, use + * the value in the last listener. The nthreads should be larger than + * 0 in listener. + */ + nthreads = db_rep->config_nthreads == 0 ? (int)rep->listener_nthreads : + db_rep->config_nthreads; + /* + * It is possible that this subordinate process does not have intact + * connections to the other sites. For most ack policies, restarting + * repmgr will wait for acks when it commits its transaction to reload + * the gmdb. Temporarily set the ack policy to NONE for the takeover + * so that it is not delayed waiting for acks that can never come. + */ + save_policy = rep->perm_policy; + rep->perm_policy = DB_REPMGR_ACKS_NONE; + /* + * Restart the repmgr as listener. If DB_REP_IGNORE is returned, + * the current process has become listener. If DB_REP_UNAVAIL is + * returned, the site has been removed from the group and no listener + * should be started. For any other error, if the replication is + * stopped because of the takeover thread, we will notify the + * application. + */ + ret = __repmgr_start_int(env, nthreads, F_ISSET(rep, REP_F_MASTER) ? + DB_REP_MASTER : DB_REP_CLIENT); + if (ret == 0 && !IS_SUBORDINATE(db_rep) && + db_rep->repmgr_status == running) { + STAT(rep->mstat.st_takeovers++); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "finished takeover and became listener")); + } else if (ret != 0 && db_rep->repmgr_status == stopped) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "failed to take over, repmgr was stopped")); + DB_EVENT(env, DB_EVENT_REP_AUTOTAKEOVER_FAILED, NULL); + } else { + /* The current process is not changed to listener. */ + RPRINT(env, (env, DB_VERB_REPMGR_MISC, "failed to take over")); + } + rep->perm_policy = save_policy; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, "takeover thread is exiting")); + ENV_LEAVE(env, ip); +out: th->finished = TRUE; + return (NULL); +} + +/* + * Reset the last_rcvd_timestamp to restart the wait for a heartbeat + * monitor expiration. + */ +static int +__repmgr_reset_last_rcvd(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_SITE *master; + + db_rep = env->rep_handle; + + LOCK_MUTEX(db_rep->mutex); + if ((master = __repmgr_connected_master(env)) != NULL) + __os_gettime(env, &master->last_rcvd_timestamp, 1); + UNLOCK_MUTEX(db_rep->mutex); + return (0); +} + +/* + * Monitor the connection to master listener. When the master listener is + * disconnected and some other master process might take over as listener + * soon, we will delay the election. After the delay if there is still no + * connection from master listener, call an election then. + */ +static int +__repmgr_check_master_listener(env) + ENV *env; +{ + DB_REP *db_rep; + REP *rep; + REPMGR_SITE *master; + db_timespec t; + u_int32_t flags; + int ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + ret = 0; + + /* + * We only check for a master listener if m_listener_chk is set. + * The field is only set when __repmgr_bust_connection() previously + * detected the loss of our connection to the master listener. + * If rep->master_id is invalid, wait until it is ready to check. + */ + if (!FLD_ISSET((db_rep)->region->config, REP_C_AUTOTAKEOVER) || + !timespecisset(&db_rep->m_listener_chk) || + !IS_VALID_EID(rep->master_id)) + return (0); + + __os_gettime(env, &t, 1); + if (timespeccmp(&t, &db_rep->m_listener_chk, >=)) { + master = SITE_FROM_EID(db_rep->region->master_id); + if (master->ref.conn.out == NULL && + master->ref.conn.in == NULL) { + flags = ELECT_F_EVENT_NOTIFY; + if (FLD_ISSET(db_rep->region->config, REP_C_ELECTIONS)) + LF_SET(ELECT_F_IMMED | ELECT_F_FAST); + else + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Master failure, but no elections")); + + /* + * In preferred master mode, a client that has lost its + * connection to the master uses an election thread to + * restart as master. + */ + if (IS_PREFMAS_MODE(env)) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, +"check_master_listener setting preferred master temp master")); + db_rep->prefmas_pending = start_temp_master; + } + + ret = __repmgr_init_election(env, flags); + } + /* + * If the delay has expired reset m_listener_chk. We reset + * it whether or not the master listener process comes back + * so that we will not continue checking for a master listener + * indefinitely. + */ + timespecclear(&db_rep->m_listener_chk); + } + return (ret); +} + +/* + * Wake up I/O waiting in selector thread, refresh connections to all connected + * and present sites. + * + * PUBLIC: int __repmgr_refresh_selector __P((ENV *)); + */ +int +__repmgr_refresh_selector(env) + ENV *env; +{ + DB_REP *db_rep; + REP *rep; + REPMGR_RETRY *retry; + REPMGR_SITE *site; + SITEINFO *sites; + int eid, ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + + if ((ret = __repmgr_wake_main_thread(env)) != 0) + return (ret); + + FOR_EACH_REMOTE_SITE_INDEX(eid) { + SET_LISTENER_CAND(1, = 0); + site = SITE_FROM_EID(eid); + + /* + * It is possible some sites were left in a paused state + * during the switch, so they have to be removed from the + * retry list. + */ + if (site->state == SITE_PAUSING) { + retry = site->ref.retry; + if (retry != NULL) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Removing site from retry list eid %lu", + (u_long)eid)); + TAILQ_REMOVE(&db_rep->retries, retry, entries); + __os_free(env, retry); + site->ref.retry = NULL; + } + + } + /* + * Try to connect to any site that is now PRESENT after + * rereading the gmdb. + */ + if (site->membership == SITE_PRESENT && + (ret = __repmgr_try_one(env, eid, TRUE)) != 0) + return (ret); + } + return (0); } /* @@ -415,10 +832,11 @@ __repmgr_retry_connections(env) __os_free(env, retry); DB_ASSERT(env, IS_VALID_EID(eid)); site = SITE_FROM_EID(eid); + site->ref.retry = NULL; DB_ASSERT(env, site->state == SITE_PAUSING); if (site->membership == SITE_PRESENT) { - if ((ret = __repmgr_try_one(env, eid)) != 0) + if ((ret = __repmgr_try_one(env, eid, FALSE)) != 0) return (ret); } else site->state = SITE_IDLE; @@ -437,11 +855,23 @@ __repmgr_first_try_connections(env) ENV *env; { DB_REP *db_rep; + REP *rep; REPMGR_SITE *site; + SITEINFO *sites; int eid, ret; db_rep = env->rep_handle; + rep = db_rep->region; + + /* + * Check test hook preventing heartbeats and connection attempts. + * This is used to create and maintain a dupmaster condition in + * a test until the test hook is rescinded. + */ + DB_TEST_SET(env->test_abort, DB_TEST_REPMGR_HEARTBEAT); + FOR_EACH_REMOTE_SITE_INDEX(eid) { + SET_LISTENER_CAND(1, = 0); site = SITE_FROM_EID(eid); /* * Normally all sites would be IDLE here. But if a user thread @@ -453,19 +883,22 @@ __repmgr_first_try_connections(env) */ if (site->state == SITE_IDLE && site->membership == SITE_PRESENT && - (ret = __repmgr_try_one(env, eid)) != 0) + (ret = __repmgr_try_one(env, eid, FALSE)) != 0) return (ret); } +DB_TEST_RECOVERY_LABEL return (0); } /* - * Starts a thread to open a connection to the site at the given EID. + * Starts a thread to open a connection to the site at the given EID. We might + * have no connection to the site, or an existing connection to be replaced. */ static int -__repmgr_try_one(env, eid) +__repmgr_try_one(env, eid, refresh) ENV *env; int eid; + int refresh; { DB_REP *db_rep; REPMGR_SITE *site; @@ -488,13 +921,22 @@ __repmgr_try_one(env, eid) "eid %lu previous connector thread still running; will retry", (u_long)eid)); return (__repmgr_schedule_connection_attempt(env, - eid, FALSE)); + eid, refresh)); } site->state = SITE_CONNECTING; th->run = __repmgr_connector_thread; - th->args.eid = eid; + th->args.conn_th.eid = eid; + /* + * The flag CONNECT_F_REFRESH indicates an immediate connection attempt + * should be scheduled if the current connection attempt fails. It is + * turned on before the first attempt to refresh the connection but + * turned off if the first attempt fails. In this way, when refreshing + * the connection, there will be at most two immediate connection + * attempts, after that, retry as usual. + */ + th->args.conn_th.flags = refresh ? CONNECT_F_REFRESH : 0; if ((ret = __repmgr_thread_start(env, th)) != 0) { __os_free(env, th); site->connector = NULL; @@ -506,21 +948,33 @@ static void * __repmgr_connector_thread(argsp) void *argsp; { - REPMGR_RUNNABLE *th; ENV *env; + DB_THREAD_INFO *ip; + REPMGR_RUNNABLE *th; int ret; th = argsp; env = th->env; + ip = NULL; + ret = 0; - RPRINT(env, (env, DB_VERB_REPMGR_MISC, - "starting connector thread, eid %u", th->args.eid)); - if ((ret = __repmgr_connector_main(env, th)) != 0) { + ENV_ENTER_RET(env, ip, ret); + if (ret == 0) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "starting connector thread, eid %u", + th->args.conn_th.eid)); + if (ret != 0 || (ret = __repmgr_connector_main(env, th)) != 0) { __db_err(env, ret, DB_STR("3617", "connector thread failed")); + RPRINT(env, (env, + DB_VERB_REPMGR_MISC, "connector thread is exiting")); + ENV_LEAVE(env, ip); (void)__repmgr_thread_failure(env, ret); } - RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting")); - + if (ret == 0) { + RPRINT(env, (env, + DB_VERB_REPMGR_MISC, "connector thread is exiting")); + ENV_LEAVE(env, ip); + } th->finished = TRUE; return (NULL); } @@ -542,8 +996,8 @@ __repmgr_connector_main(env, th) ret = 0; LOCK_MUTEX(db_rep->mutex); - DB_ASSERT(env, IS_VALID_EID(th->args.eid)); - site = SITE_FROM_EID(th->args.eid); + DB_ASSERT(env, IS_VALID_EID(th->args.conn_th.eid)); + site = SITE_FROM_EID(th->args.conn_th.eid); if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped) goto unlock; @@ -563,7 +1017,8 @@ __repmgr_connector_main(env, th) UNLOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) { - DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &th->args.eid); + DB_EVENT(env, + DB_EVENT_REP_CONNECT_ESTD, &th->args.conn_th.eid); LOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) { __db_err(env, ret, DB_STR("3618", @@ -571,33 +1026,53 @@ __repmgr_connector_main(env, th) goto cleanup; } conn->type = REP_CONNECTION; - site = SITE_FROM_EID(th->args.eid); + site = SITE_FROM_EID(th->args.conn_th.eid); if (site->state != SITE_CONNECTING || db_rep->repmgr_status == stopped) goto cleanup; - conn->eid = th->args.eid; - site = SITE_FROM_EID(th->args.eid); - site->ref.conn.out = conn; + conn->eid = th->args.conn_th.eid; + site = SITE_FROM_EID(th->args.conn_th.eid); + /* + * If there is an existing outgoing connection, disable it and + * replace it with a new connection. The sites for a formerly + * subordinate handle that is now taking over might still be + * SITE_CONNECTING. Set to SITE_CONNECTED before disabling + * connection so that sites_avail is correctly maintained. + */ site->state = SITE_CONNECTED; + if (site->ref.conn.out != NULL) + (void)__repmgr_disable_connection(env, + site->ref.conn.out); + site->ref.conn.out = conn; __os_gettime(env, &site->last_rcvd_timestamp, 1); ret = __repmgr_wake_main_thread(env); } else if (ret == DB_REP_UNAVAIL) { /* Retryable error while trying to connect: retry later. */ - info.eid = th->args.eid; + info.eid = th->args.conn_th.eid; info.error = err; DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info); STAT(db_rep->region->mstat.st_connect_fail++); LOCK_MUTEX(db_rep->mutex); - site = SITE_FROM_EID(th->args.eid); + site = SITE_FROM_EID(th->args.conn_th.eid); if (site->state != SITE_CONNECTING || db_rep->repmgr_status == stopped) { ret = 0; goto unlock; } + /* + * If it fails to create a new outgoing connection to replace + * the existing one in the first attempt, schedule another + * immediate attempt. If it is our second attempt, disable + * the existing connections and retry as normal. + */ + if (site->ref.conn.out != NULL && th->args.conn_th.flags == 0) + (void)__repmgr_disable_connection(env, + site->ref.conn.out); ret = __repmgr_schedule_connection_attempt(env, - th->args.eid, FALSE); + th->args.conn_th.eid, + th->args.conn_th.flags == CONNECT_F_REFRESH); } else goto out; @@ -842,6 +1317,7 @@ prepare_input(env, conn) if ((ret = __os_malloc(env, memsize, &membase)) != 0) return (ret); conn->input.rep_message = membase; + conn->input.rep_message->size = memsize; conn->input.rep_message->msg_hdr = msg_hdr; conn->input.rep_message->v.repmsg.originating_eid = conn->eid; @@ -876,6 +1352,7 @@ prepare_input(env, conn) if ((ret = __os_malloc(env, memsize, &membase)) != 0) return (ret); conn->input.rep_message = membase; + conn->input.rep_message->size = memsize; conn->input.rep_message->msg_hdr = msg_hdr; conn->input.rep_message->v.appmsg.conn = conn; @@ -891,6 +1368,7 @@ prepare_input(env, conn) if ((ret = __os_malloc(env, size, &membase)) != 0) return (ret); conn->input.rep_message = membase; + conn->input.rep_message->size = size; conn->input.rep_message->msg_hdr = msg_hdr; /* @@ -1065,16 +1543,18 @@ dispatch_msgin(env, conn) ENV *env; REPMGR_CONNECTION *conn; { + DBT *dbt; DB_REP *db_rep; - REPMGR_SITE *site; - REPMGR_RUNNABLE *th; + REP *rep; REPMGR_RESPONSE *resp; - DBT *dbt; + REPMGR_RUNNABLE *th; + REPMGR_SITE *site; char *hostname; - int eid, ret; + int eid, ret, subord; DB_ASSERT(env, conn->reading_phase == DATA_PHASE); db_rep = env->rep_handle; + rep = db_rep->region; switch (conn->state) { case CONN_CONNECTED: @@ -1129,9 +1609,22 @@ dispatch_msgin(env, conn) dbt = &conn->input.repmgr_msg.rec; hostname = dbt->data; hostname[dbt->size-1] = '\0'; - if ((ret = accept_handshake(env, conn, hostname)) != 0) + if ((ret = accept_handshake(env, + conn, hostname, &subord)) != 0) return (ret); conn->state = CONN_READY; + site = SITE_FROM_EID(conn->eid); + /* + * Do not increase sites_avail redundantly for an + * incoming subordinate connection. + */ + if (conn->type == REP_CONNECTION && + site->state == SITE_CONNECTED && !subord) { + rep->sites_avail++; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "msgin: EID %lu CONNECTED, READY. sites_avail %lu", + (u_long)conn->eid, (u_long)rep->sites_avail)); + } break; case REPMGR_OWN_MSG: /* @@ -1279,9 +1772,11 @@ process_own_msg(env, conn) REPMGR_SITE *site; REPMGR_MESSAGE *msg; __repmgr_connect_reject_args reject; + __repmgr_v4connect_reject_args v4reject; __repmgr_parm_refresh_args parms; int ret; + db_rep = env->rep_handle; ret = 0; /* * Set "msg" to point to the message struct. If we do all necessary @@ -1293,28 +1788,61 @@ process_own_msg(env, conn) switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) { case REPMGR_CONNECT_REJECT: dbt = &msg->v.gmdb_msg.request; - if ((ret = __repmgr_connect_reject_unmarshal(env, - &reject, dbt->data, dbt->size, NULL)) != 0) - return (DB_REP_UNAVAIL); + if (conn->version < 5) { + if ((ret = __repmgr_v4connect_reject_unmarshal(env, + &v4reject, dbt->data, dbt->size, NULL)) != 0) + return (DB_REP_UNAVAIL); + reject.version = v4reject.version; + reject.gen = v4reject.gen; + reject.status = 0; + } else { + if ((ret = __repmgr_connect_reject_unmarshal(env, + &reject, dbt->data, dbt->size, NULL)) != 0) + return (DB_REP_UNAVAIL); + } /* * If we're being rejected by someone who has more up-to-date - * membership information than we do, it means we have been - * removed from the group. If we've just gotten started, we can - * make one attempt at automatically rejoining; otherwise we bow - * out gracefully. + * membership information than we do, it means we are not in + * the group. If we've just gotten started, or our status is + * adding, we can make one attempt at automatically rejoining; + * otherwise we bow out gracefully. */ RPRINT(env, (env, DB_VERB_REPMGR_MISC, - "got rejection msg citing version %lu/%lu", - (u_long)reject.gen, (u_long)reject.version)); + "got rejection msg citing version %lu/%lu mine %lu/%lu membership %lu", + (u_long)reject.gen, (u_long)reject.version, + (u_long)db_rep->member_version_gen, + (u_long)db_rep->membership_version, + (u_long)reject.status)); if (__repmgr_gmdb_version_cmp(env, reject.gen, reject.version) > 0) { - if (env->rep_handle->seen_repmsg) + if (db_rep->seen_repmsg && reject.status != SITE_ADDING) ret = DB_DELETED; - else if ((ret = __repmgr_defer_op(env, - REPMGR_REJOIN)) == 0) - ret = DB_REP_UNAVAIL; + else { + /* + * If 2SITE_STRICT is off, we are likely to + * win an election with our own vote before + * discovering there is already a master. + * Set indicator to defer the election until + * after rejoining group. + * + * In preferred master mode, either site + * should defer the election (which + * executes the preferred master startup + * code and only calls an election if it is + * safe) and also avoid scheduling an extra + * reconnect attempt in bust_connection() + * by setting the indicator. + */ + if (!FLD_ISSET(db_rep->region->config, + REP_C_2SITE_STRICT) || + IS_PREFMAS_MODE(env)) + db_rep->rejoin_pending = TRUE; + if ((ret = __repmgr_defer_op(env, + REPMGR_REJOIN)) == 0) + ret = DB_REP_UNAVAIL; + } } else ret = DB_REP_UNAVAIL; DB_ASSERT(env, ret != 0); @@ -1332,7 +1860,6 @@ process_own_msg(env, conn) if ((ret = __repmgr_parm_refresh_unmarshal(env, &parms, dbt->data, dbt->size, NULL)) != 0) return (DB_REP_UNAVAIL); - db_rep = env->rep_handle; DB_ASSERT(env, conn->type == REP_CONNECTION && IS_KNOWN_REMOTE_SITE(conn->eid)); site = SITE_FROM_EID(conn->eid); @@ -1348,8 +1875,15 @@ process_own_msg(env, conn) case REPMGR_GM_FORWARD: case REPMGR_JOIN_REQUEST: case REPMGR_JOIN_SUCCESS: + case REPMGR_LSNHIST_REQUEST: + case REPMGR_LSNHIST_RESPONSE: + case REPMGR_PREFMAS_FAILURE: + case REPMGR_PREFMAS_SUCCESS: + case REPMGR_READONLY_MASTER: + case REPMGR_READONLY_RESPONSE: case REPMGR_REMOVE_REQUEST: case REPMGR_RESOLVE_LIMBO: + case REPMGR_RESTART_CLIENT: default: __db_errx(env, DB_STR_A("3677", "unexpected msg type %lu in process_own_msg", "%lu"), @@ -1482,6 +2016,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags) cntrl_len = __REPMGR_V3HANDSHAKE_SIZE; break; case 4: + case 5: + case 6: cntrl_len = __REPMGR_HANDSHAKE_SIZE; break; default: @@ -1513,6 +2049,8 @@ __repmgr_send_handshake(env, conn, opt, optlen, flags) __repmgr_v3handshake_marshal(env, &v3hs, p); break; case 4: + case 5: + case 6: hs.port = my_addr->port; hs.alignment = MEM_ALIGN; hs.ack_policy = (u_int32_t)rep->perm_policy; @@ -1551,11 +2089,14 @@ read_version_response(env, conn) DB_REP *db_rep; __repmgr_version_confirmation_args conf; DBT vi; + REP *rep; + REPMGR_SITE *site; char *hostname; u_int32_t flags; - int ret; + int ret, subord; db_rep = env->rep_handle; + rep = db_rep->region; if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0) return (ret); @@ -1581,14 +2122,37 @@ read_version_response(env, conn) return (DB_REP_UNAVAIL); } - if ((ret = accept_handshake(env, conn, hostname)) != 0) + if ((ret = accept_handshake(env, conn, hostname, &subord)) != 0) return (ret); - flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0; + if (!IS_SUBORDINATE(db_rep)) + flags = 0; + else { + flags = REPMGR_SUBORDINATE; + if (FLD_ISSET(rep->config, REP_C_AUTOTAKEOVER) && + db_rep->repmgr_status == running) + /* + * Takeover is enabled in rep-aware subordinate + * process. + */ + flags |= REPMGR_AUTOTAKEOVER; + } if ((ret = __repmgr_send_handshake(env, conn, NULL, 0, flags)) != 0) return (ret); } conn->state = CONN_READY; + site = SITE_FROM_EID(conn->eid); + /* + * Do not increase sites_avail redundantly for a new outgoing + * connection from a subordinate process. + */ + if (conn->type == REP_CONNECTION && + site->state == SITE_CONNECTED && !IS_SUBORDINATE(db_rep)) { + rep->sites_avail++; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "vers_resp: EID %lu CONNECTED, READY. sites_avail %lu", + (u_long)conn->eid, (u_long)rep->sites_avail)); + } return (ret); } @@ -1641,10 +2205,11 @@ __repmgr_find_version_info(env, conn, vi) } static int -accept_handshake(env, conn, hostname) +accept_handshake(env, conn, hostname, subordinate) ENV *env; REPMGR_CONNECTION *conn; char *hostname; + int *subordinate; { __repmgr_handshake_args hs; __repmgr_v2handshake_args hs2; @@ -1653,6 +2218,7 @@ accept_handshake(env, conn, hostname) u_int32_t ack, flags; int electable; + *subordinate = 0; switch (conn->version) { case 2: if (__repmgr_v2handshake_unmarshal(env, &hs2, @@ -1674,6 +2240,8 @@ accept_handshake(env, conn, hostname) ack = 0; break; case 4: + case 5: + case 6: if (__repmgr_handshake_unmarshal(env, &hs, conn->input.repmgr_msg.cntrl.data, conn->input.repmgr_msg.cntrl.size, NULL) != 0) @@ -1682,6 +2250,8 @@ accept_handshake(env, conn, hostname) electable = F_ISSET(&hs, ELECTABLE_SITE); flags = hs.flags; ack = hs.ack_policy; + if (LF_ISSET(REPMGR_SUBORDINATE)) + *subordinate = 1; break; default: __db_errx(env, DB_STR_A("3679", @@ -1729,13 +2299,17 @@ process_parameters(env, conn, host, port, ack, electable, flags) u_int32_t ack, flags; { DB_REP *db_rep; + REP *rep; REPMGR_RETRY *retry; REPMGR_SITE *site; + SITEINFO *sites; __repmgr_connect_reject_args reject; + __repmgr_v4connect_reject_args v4reject; u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE]; int eid, ret; db_rep = env->rep_handle; + rep = db_rep->region; /* Connection state can be used to discern incoming versus outgoing. */ if (conn->state == CONN_CONNECTED) { @@ -1785,6 +2359,13 @@ process_parameters(env, conn, host, port, ack, electable, flags) TAILQ_INSERT_TAIL(&site->sub_conns, conn, entries); conn->eid = eid; + conn->auto_takeover = + LF_ISSET(REPMGR_AUTOTAKEOVER) ? 1 : 0; + SET_LISTENER_CAND(conn->auto_takeover, ++); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "handshake from subordinate %sconnection at site %s:%u EID %u", + LF_ISSET(REPMGR_AUTOTAKEOVER)? + "takeover ": "", host, port, eid)); } else { DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &eid); @@ -1797,6 +2378,7 @@ process_parameters(env, conn, host, port, ack, electable, flags) TAILQ_REMOVE(&db_rep->retries, retry, entries); __os_free(env, retry); + site->ref.retry = NULL; break; case SITE_CONNECTED: /* @@ -1821,6 +2403,16 @@ process_parameters(env, conn, host, port, ack, electable, flags) * don't have to do anything else here. */ break; + case SITE_IDLE: + /* + * This can occur after the heartbeat + * test hook artificially kept this + * site from first trying to connect. + */ + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "handshake from idle site %s:%u EID %u", + host, port, eid)); + break; default: DB_ASSERT(env, FALSE); } @@ -1834,10 +2426,18 @@ process_parameters(env, conn, host, port, ack, electable, flags) RPRINT(env, (env, DB_VERB_REPMGR_MISC, "rejecting connection from unknown or provisional site %s:%u", host, port)); - reject.version = db_rep->membership_version; - reject.gen = db_rep->member_version_gen; - __repmgr_connect_reject_marshal(env, - &reject, reject_buf); + if (conn->version < 5) { + v4reject.version = db_rep->membership_version; + v4reject.gen = db_rep->member_version_gen; + __repmgr_v4connect_reject_marshal(env, + &v4reject, reject_buf); + } else { + reject.version = db_rep->membership_version; + reject.gen = db_rep->member_version_gen; + reject.status = (site) ? site->membership : 0; + __repmgr_connect_reject_marshal(env, + &reject, reject_buf); + } if ((ret = __repmgr_send_own_msg(env, conn, REPMGR_CONNECT_REJECT, reject_buf, @@ -1867,7 +2467,8 @@ process_parameters(env, conn, host, port, ack, electable, flags) */ if (!IS_SUBORDINATE(db_rep) && /* us */ !__repmgr_master_is_known(env) && - !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */ + !LF_ISSET(REPMGR_SUBORDINATE) && /* the remote site */ + !IS_PREFMAS_MODE(env)) { RPRINT(env, (env, DB_VERB_REPMGR_MISC, "handshake with no known master to wake election thread")); db_rep->new_connection = TRUE; @@ -1980,6 +2581,7 @@ record_permlsn(env, conn) */ if (ackp->lsn.file > site->max_ack.file) do_log_check = 1; + site->max_ack_gen = ackp->generation; memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN)); if (do_log_check) check_min_log_file(env); diff --git a/src/repmgr/repmgr_stat.c b/src/repmgr/repmgr_stat.c index fd6dabd3..215f4719 100644 --- a/src/repmgr/repmgr_stat.c +++ b/src/repmgr/repmgr_stat.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -55,7 +55,9 @@ __repmgr_stat(env, statp, flags) { DB_REP *db_rep; DB_REPMGR_STAT *copy, *stats; - uintmax_t tmp; + REPMGR_SITE *site; + u_int32_t tmp; + u_int i; int ret; db_rep = env->rep_handle; @@ -73,6 +75,20 @@ __repmgr_stat(env, statp, flags) memset(stats, 0, sizeof(DB_REPMGR_STAT)); stats->st_max_elect_threads = tmp; } + stats->st_incoming_queue_gbytes = db_rep->input_queue.gbytes; + stats->st_incoming_queue_bytes = db_rep->input_queue.bytes; + LOCK_MUTEX(db_rep->mutex); + for (i = 0; i < db_rep->site_cnt; i++) { + site = SITE_FROM_EID(i); + if (site->membership != 0) { + copy->st_site_total++; + if (FLD_ISSET(site->gmdb_flags, SITE_VIEW)) + copy->st_site_views++; + else + copy->st_site_participants++; + } + } + UNLOCK_MUTEX(db_rep->mutex); *statp = copy; return (0); @@ -148,6 +164,11 @@ __repmgr_print_stats(env, flags) (u_long)sp->st_msgs_queued); __db_dl(env, "Number of messages discarded due to queue length", (u_long)sp->st_msgs_dropped); + __db_dlbytes(env, "Incoming message size in queue", + (u_long)sp->st_incoming_queue_gbytes, (u_long)0, + (u_long)sp->st_incoming_queue_bytes); + __db_dl(env, "Number of messages discarded due to incoming queue full", + (u_long)sp->st_incoming_msgs_dropped); __db_dl(env, "Number of existing connections dropped", (u_long)sp->st_connection_drop); __db_dl(env, "Number of failed new connection attempts", @@ -156,6 +177,14 @@ __repmgr_print_stats(env, flags) (u_long)sp->st_elect_threads); __db_dl(env, "Election threads for which space is reserved", (u_long)sp->st_max_elect_threads); + __db_dl(env, "Number of participant sites in replication group", + (u_long)sp->st_site_participants); + __db_dl(env, "Total number of sites in replication group", + (u_long)sp->st_site_total); + __db_dl(env, "Number of view sites in replication group", + (u_long)sp->st_site_views); + __db_dl(env, "Number of automatic replication process takeovers", + (u_long)sp->st_takeovers); __os_ufree(env, sp); @@ -171,7 +200,7 @@ __repmgr_print_sites(env) u_int count, i; int ret; - if ((ret = __repmgr_site_list(env->dbenv, &count, &list)) != 0) + if ((ret = __repmgr_site_list_int(env, &count, &list)) != 0) return (ret); if (count == 0) @@ -189,6 +218,9 @@ __repmgr_print_sites(env) list[i].status == DB_REPMGR_CONNECTED ? "" : "dis"); __db_msgadd(env, &mb, ", %speer", F_ISSET(&list[i], DB_REPMGR_ISPEER) ? "" : "non-"); + __db_msgadd(env, &mb, ", %s", + F_ISSET(&list[i], DB_REPMGR_ISVIEW) ? + "view" : "participant"); __db_msgadd(env, &mb, ")"); DB_MSGBUF_FLUSH(env, &mb); } @@ -238,26 +270,46 @@ __repmgr_stat_print_pp(dbenv, flags) #endif /* - * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **)); + * PUBLIC: int __repmgr_site_list_pp + * PUBLIC: __P((DB_ENV *, u_int *, DB_REPMGR_SITE **)); */ int -__repmgr_site_list(dbenv, countp, listp) +__repmgr_site_list_pp(dbenv, countp, listp) DB_ENV *dbenv; u_int *countp; DB_REPMGR_SITE **listp; { - DB_REP *db_rep; - REP *rep; - DB_REPMGR_SITE *status; ENV *env; DB_THREAD_INFO *ip; + int ret; + + env = dbenv->env; + + ENV_ENTER(env, ip); + ret = __repmgr_site_list_int(env, countp, listp); + ENV_LEAVE(env, ip); + + return (ret); +} + +/* + * PUBLIC: int __repmgr_site_list_int __P((ENV *, u_int *, DB_REPMGR_SITE **)); + */ +int +__repmgr_site_list_int(env, countp, listp) + ENV *env; + u_int *countp; + DB_REPMGR_SITE **listp; +{ + DB_REP *db_rep; + DB_REPMGR_SITE *status; + REP *rep; REPMGR_SITE *site; size_t array_size, total_size; int eid, locked, ret; u_int count, i; char *name; - env = dbenv->env; db_rep = env->rep_handle; ret = 0; @@ -269,10 +321,8 @@ __repmgr_site_list(dbenv, countp, listp) LOCK_MUTEX(db_rep->mutex); locked = TRUE; - ENV_ENTER(env, ip); if (rep->siteinfo_seq > db_rep->siteinfo_seq) ret = __repmgr_sync_siteaddr(env); - ENV_LEAVE(env, ip); if (ret != 0) goto err; } else { @@ -329,6 +379,8 @@ __repmgr_site_list(dbenv, countp, listp) if (FLD_ISSET(site->config, DB_REPMGR_PEER)) F_SET(&status[i], DB_REPMGR_ISPEER); + if (FLD_ISSET(site->gmdb_flags, SITE_VIEW)) + F_SET(&status[i], DB_REPMGR_ISVIEW); /* * If we haven't started a communications thread, connection diff --git a/src/repmgr/repmgr_stub.c b/src/repmgr/repmgr_stub.c index 734c2240..999b759f 100644 --- a/src/repmgr/repmgr_stub.c +++ b/src/repmgr/repmgr_stub.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -73,6 +73,69 @@ __repmgr_set_ack_policy(dbenv, policy) /* * PUBLIC: #ifndef HAVE_REPLICATION_THREADS + * PUBLIC: int __repmgr_get_incoming_queue_max __P((DB_ENV *, u_int32_t *, + * PUBLIC: u_int32_t *)); + * PUBLIC: #endif + */ +int +__repmgr_get_incoming_queue_max(dbenv, messagesp, bulk_messagesp) + DB_ENV *dbenv; + u_int32_t *messagesp; + u_int32_t *bulk_messagesp; +{ + COMPQUIET(messagesp, NULL); + COMPQUIET(bulk_messagesp, NULL); + return (__db_norepmgr(dbenv)); +} + +/* + * PUBLIC: #ifndef HAVE_REPLICATION_THREADS + * PUBLIC: int __repmgr_set_incoming_queue_max __P((DB_ENV *, u_int32_t, + * PUBLIC: u_int32_t)); + * PUBLIC: #endif + */ +int +__repmgr_set_incoming_queue_max(dbenv, messages, bulk_messages) + DB_ENV *dbenv; + u_int32_t messages; + u_int32_t bulk_messages; +{ + COMPQUIET(messages, 0); + COMPQUIET(bulk_messages, 0); + return (__db_norepmgr(dbenv)); +} + +/* + * PUBLIC: #ifndef HAVE_REPLICATION_THREADS + * PUBLIC: int __repmgr_get_incoming_queue_redzone __P((DB_ENV *, + * PUBLIC: u_int32_t *, u_int32_t *)); + * PUBLIC: #endif + */ +int __repmgr_get_incoming_queue_redzone(dbenv, gbytesp, bytesp) + DB_ENV *dbenv; + u_int32_t *gbytesp, *bytesp; +{ + COMPQUIET(gbytesp, NULL); + COMPQUIET(bytesp, NULL); + return (__db_norepmgr(dbenv)); +} + +/* + * PUBLIC: #ifndef HAVE_REPLICATION_THREADS + * PUBLIC: int __repmgr_get_incoming_queue_fullevent __P((DB_ENV *, + * PUBLIC: int *)); + * PUBLIC: #endif + */ +int __repmgr_get_incoming_queue_fullevent(dbenv, onoffp) + DB_ENV *dbenv; + int *onoffp; +{ + COMPQUIET(onoffp, NULL); + return (__db_norepmgr(dbenv)); +} + +/* + * PUBLIC: #ifndef HAVE_REPLICATION_THREADS * PUBLIC: int __repmgr_site * PUBLIC: __P((DB_ENV *, const char *, u_int, DB_SITE **, u_int32_t)); * PUBLIC: #endif @@ -125,11 +188,12 @@ __repmgr_local_site(dbenv, dbsitep) /* * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_site_list __P((DB_ENV *, u_int *, DB_REPMGR_SITE **)); + * PUBLIC: int __repmgr_site_list_pp + * PUBLIC: __P((DB_ENV *, u_int *, DB_REPMGR_SITE **)); * PUBLIC: #endif */ int -__repmgr_site_list(dbenv, countp, listp) +__repmgr_site_list_pp(dbenv, countp, listp) DB_ENV *dbenv; u_int *countp; DB_REPMGR_SITE **listp; @@ -141,11 +205,11 @@ __repmgr_site_list(dbenv, countp, listp) /* * PUBLIC: #ifndef HAVE_REPLICATION_THREADS - * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); + * PUBLIC: int __repmgr_start_pp __P((DB_ENV *, int, u_int32_t)); * PUBLIC: #endif */ int -__repmgr_start(dbenv, nthreads, flags) +__repmgr_start_pp(dbenv, nthreads, flags) DB_ENV *dbenv; int nthreads; u_int32_t flags; diff --git a/src/repmgr/repmgr_util.c b/src/repmgr/repmgr_util.c index c2439436..1c5ebe59 100644 --- a/src/repmgr/repmgr_util.c +++ b/src/repmgr/repmgr_util.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -15,9 +15,13 @@ #define INITIAL_SITES_ALLOCATION 3 /* Arbitrary guess. */ +static int convert_gmdb(ENV *, DB_THREAD_INFO *, DB *, DB_TXN *); static int get_eid __P((ENV *, const char *, u_int, int *)); -static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *)); static int read_gmdb __P((ENV *, DB_THREAD_INFO *, u_int8_t **, size_t *)); +static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *)); +static int __repmgr_find_commit __P((ENV *, DB_LSN *, DB_LSN *, int *)); +static int __repmgr_remote_lsnhist(ENV *, int, u_int32_t, + __repmgr_lsnhist_match_args *); /* * Schedules a future attempt to re-establish a connection with the given site. @@ -43,6 +47,8 @@ __repmgr_schedule_connection_attempt(env, eid, immediate) REP *rep; REPMGR_RETRY *retry, *target; REPMGR_SITE *site; + SITEINFO *sites; + db_timeout_t timeout; db_timespec t; int ret; @@ -57,7 +63,24 @@ __repmgr_schedule_connection_attempt(env, eid, immediate) if (immediate) TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries); else { - TIMESPEC_ADD_DB_TIMEOUT(&t, rep->connection_retry_wait); + /* + * Normally we retry a connection after connection retry + * timeout. In a subordinate rep-aware process, we retry sooner + * when there is a listener candidate on the disconnected site. + * The listener process will be connected from the new listener, + * but subordinate rep-aware process can only wait for retry. + * It matters when the subordinate process becomes listener and + * the disconnected site is master. The m_listener_wait is set + * to retry after enough time has passed for a takeover. The + * number of listener candidates is maintained in the listener + * process as it has connections to all subordinate processes + * from other sites. + */ + timeout = rep->connection_retry_wait; + CHECK_LISTENER_CAND(timeout, >0, db_rep->m_listener_wait, + timeout); + TIMESPEC_ADD_DB_TIMEOUT(&t, timeout); + /* * Insert the new "retry" on the (time-ordered) list in its * proper position. To do so, find the list entry ("target") @@ -284,6 +307,7 @@ __repmgr_new_site(env, sitep, host, port) site->net_addr.host = p; site->net_addr.port = (u_int16_t)port; + site->max_ack_gen = 0; ZERO_LSN(site->max_ack); site->ack_policy = 0; site->alignment = 0; @@ -295,6 +319,7 @@ __repmgr_new_site(env, sitep, host, port) site->state = SITE_IDLE; site->membership = 0; + site->gmdb_flags = 0; site->config = 0; *sitep = site; @@ -535,11 +560,14 @@ __repmgr_thread_failure(env, why) int why; { DB_REP *db_rep; + DB_THREAD_INFO *ip; db_rep = env->rep_handle; + ENV_ENTER(env, ip); LOCK_MUTEX(db_rep->mutex); (void)__repmgr_stop_threads(env); UNLOCK_MUTEX(db_rep->mutex); + ENV_LEAVE(env, ip); return (__env_panic(env, why)); } @@ -597,12 +625,13 @@ __repmgr_format_addr_loc(addr, buffer) } /* - * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t)); + * PUBLIC: int __repmgr_repstart __P((ENV *, u_int32_t, u_int32_t)); */ int -__repmgr_repstart(env, flags) +__repmgr_repstart(env, flags, startopts) ENV *env; u_int32_t flags; + u_int32_t startopts; { DBT my_addr; int ret; @@ -610,7 +639,11 @@ __repmgr_repstart(env, flags) /* Include "cdata" in case sending to old-version site. */ if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0) return (ret); - ret = __rep_start_int(env, &my_addr, flags); + /* + * force_role_chg and hold_client_gen are used by preferred master + * mode to help control site startup. + */ + ret = __rep_start_int(env, &my_addr, flags, startopts); __os_free(env, my_addr.data); if (ret != 0) __db_err(env, ret, DB_STR("3673", "rep_start")); @@ -618,11 +651,12 @@ __repmgr_repstart(env, flags) } /* - * PUBLIC: int __repmgr_become_master __P((ENV *)); + * PUBLIC: int __repmgr_become_master __P((ENV *, u_int32_t)); */ int -__repmgr_become_master(env) +__repmgr_become_master(env, startopts) ENV *env; + u_int32_t startopts; { DB_REP *db_rep; DB_THREAD_INFO *ip; @@ -631,7 +665,7 @@ __repmgr_become_master(env) REPMGR_SITE *site; DBT key_dbt, data_dbt; __repmgr_membership_key_args key; - __repmgr_membership_data_args member_status; + __repmgr_membership_data_args member_data; repmgr_netaddr_t addr; u_int32_t status; u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE]; @@ -668,16 +702,23 @@ __repmgr_become_master(env) db_rep->client_intent = FALSE; UNLOCK_MUTEX(db_rep->mutex); - if ((ret = __repmgr_repstart(env, DB_REP_MASTER)) != 0) + if ((ret = __repmgr_repstart(env, DB_REP_MASTER, startopts)) != 0) return (ret); + /* + * Make sure member_version_gen is current so that this master + * can reject obsolete member lists from other sites. + */ + db_rep->member_version_gen = db_rep->region->gen; + + /* If there is already a gmdb, we are finished. */ if (db_rep->have_gmdb) return (0); - db_rep->member_version_gen = db_rep->region->gen; - ENV_ENTER(env, ip); + /* There isn't a gmdb. Create one from the in-memory site list. */ if ((ret = __repmgr_hold_master_role(env, NULL)) != 0) goto leave; + ENV_GET_THREAD_INFO(env, ip); retry: if ((ret = __repmgr_setup_gmdb_op(env, ip, &txn, DB_CREATE)) != 0) goto err; @@ -705,8 +746,9 @@ retry: &key, key_buf, sizeof(key_buf), &len); DB_ASSERT(env, ret == 0); DB_INIT_DBT(key_dbt, key_buf, len); - member_status.flags = status; - __repmgr_membership_data_marshal(env, &member_status, data_buf); + member_data.status = status; + member_data.flags = site->gmdb_flags; + __repmgr_membership_data_marshal(env, &member_data, data_buf); DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE); if ((ret = __db_put(dbp, ip, txn, &key_dbt, &data_dbt, 0)) != 0) goto err; @@ -726,7 +768,6 @@ err: if ((t_ret = __repmgr_rlse_master_role(env)) != 0 && ret == 0) ret = t_ret; leave: - ENV_LEAVE(env, ip); return (ret); } @@ -840,6 +881,14 @@ __repmgr_open(env, rep_) rep->election_retry_wait = db_rep->election_retry_wait; rep->heartbeat_monitor_timeout = db_rep->heartbeat_monitor_timeout; rep->heartbeat_frequency = db_rep->heartbeat_frequency; + rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes; + rep->inqueue_max_bytes = db_rep->inqueue_max_bytes; + if (rep->inqueue_max_gbytes == 0 && rep->inqueue_max_bytes == 0) { + rep->inqueue_max_bytes = DB_REPMGR_DEFAULT_INQUEUE_MAX; + } + __repmgr_set_incoming_queue_redzone(rep, rep->inqueue_max_gbytes, + rep->inqueue_max_bytes); + return (ret); } @@ -958,6 +1007,18 @@ __repmgr_join(env, rep_) } db_rep->siteinfo_seq = rep->siteinfo_seq; + /* + * Update the incoming queue limit settings if necessary. + */ + if ((db_rep->inqueue_max_gbytes != 0 || + db_rep->inqueue_max_bytes != 0) && + (db_rep->inqueue_max_gbytes != rep->inqueue_max_gbytes || + db_rep->inqueue_max_bytes != rep->inqueue_max_gbytes)) { + rep->inqueue_max_gbytes = db_rep->inqueue_max_gbytes; + rep->inqueue_max_bytes = db_rep->inqueue_max_bytes; + __repmgr_set_incoming_queue_redzone(rep, + rep->inqueue_max_gbytes, rep->inqueue_max_bytes); + } unlock: MUTEX_UNLOCK(env, rep->mtx_repmgr); return (ret); @@ -1073,6 +1134,7 @@ __repmgr_share_netaddrs(env, rep_, start, limit) shared_array[eid].addr.port = db_rep->sites[i].net_addr.port; shared_array[eid].config = db_rep->sites[i].config; shared_array[eid].status = db_rep->sites[i].membership; + shared_array[eid].flags = db_rep->sites[i].gmdb_flags; RPRINT(env, (env, DB_VERB_REPMGR_MISC, "EID %d is assigned for site %s:%lu", eid, host, (u_long)shared_array[eid].addr.port)); @@ -1134,6 +1196,7 @@ __repmgr_copy_in_added_sites(env) site = SITE_FROM_EID(i); site->config = p->config; site->membership = p->status; + site->gmdb_flags = p->flags; } out: @@ -1266,7 +1329,9 @@ __repmgr_stable_lsn(env, stable_lsn) db_rep = env->rep_handle; rep = db_rep->region; - if (rep->min_log_file != 0 && rep->min_log_file < stable_lsn->file) { + LOCK_MUTEX(db_rep->mutex); + if (rep->sites_avail != 0 && rep->min_log_file != 0 && + rep->min_log_file < stable_lsn->file) { /* * Returning an LSN to be consistent with the rest of the * log archiving processing. Construct LSN of format @@ -1276,12 +1341,91 @@ __repmgr_stable_lsn(env, stable_lsn) stable_lsn->offset = 0; } RPRINT(env, (env, DB_VERB_REPMGR_MISC, - "Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu]", - (u_long)stable_lsn->file, (u_long)stable_lsn->offset)); +"Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu] sites_avail %lu min_log %lu", + (u_long)stable_lsn->file, (u_long)stable_lsn->offset, + (u_long)rep->sites_avail, (u_long)rep->min_log_file)); + UNLOCK_MUTEX(db_rep->mutex); return (0); } /* + * PUBLIC: int __repmgr_make_request_conn __P((ENV *, + * PUBLIC: repmgr_netaddr_t *, REPMGR_CONNECTION **)); + */ +int +__repmgr_make_request_conn(env, addr, connp) + ENV *env; + repmgr_netaddr_t *addr; + REPMGR_CONNECTION **connp; +{ + DBT vi; + __repmgr_msg_hdr_args msg_hdr; + __repmgr_version_confirmation_args conf; + REPMGR_CONNECTION *conn; + int alloc, ret, unused; + + alloc = FALSE; + if ((ret = __repmgr_connect(env, addr, &conn, &unused)) != 0) + return (ret); + conn->type = APP_CONNECTION; + + /* Read a handshake msg, to get version confirmation and parameters. */ + if ((ret = __repmgr_read_conn(conn)) != 0) + goto err; + /* + * We can only get here after having read the full 9 bytes that we + * expect, so this can't fail. + */ + DB_ASSERT(env, conn->reading_phase == SIZES_PHASE); + ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, + conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); + DB_ASSERT(env, ret == 0); + __repmgr_iovec_init(&conn->iovecs); + conn->reading_phase = DATA_PHASE; + + if ((ret = __repmgr_prepare_simple_input(env, conn, &msg_hdr)) != 0) + goto err; + alloc = TRUE; + + if ((ret = __repmgr_read_conn(conn)) != 0) + goto err; + + /* + * Analyze the handshake msg, and stash relevant info. + */ + if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0) + goto err; + DB_ASSERT(env, vi.size > 0); + if ((ret = __repmgr_version_confirmation_unmarshal(env, + &conf, vi.data, vi.size, NULL)) != 0) + goto err; + + if (conf.version < GM_MIN_VERSION || + (IS_VIEW_SITE(env) && conf.version < VIEW_MIN_VERSION) || + (PREFMAS_IS_SET(env) && conf.version < PREFMAS_MIN_VERSION)) { + ret = DB_REP_UNAVAIL; + goto err; + } + conn->version = conf.version; + +err: + if (alloc) { + DB_ASSERT(env, conn->input.repmgr_msg.cntrl.size > 0); + __os_free(env, conn->input.repmgr_msg.cntrl.data); + DB_ASSERT(env, conn->input.repmgr_msg.rec.size > 0); + __os_free(env, conn->input.repmgr_msg.rec.data); + } + __repmgr_reset_for_reading(conn); + if (ret == 0) + *connp = conn; + else { + (void)__repmgr_close_connection(env, conn); + (void)__repmgr_destroy_conn(env, conn); + } + return (ret); +} + +/* * PUBLIC: int __repmgr_send_sync_msg __P((ENV *, REPMGR_CONNECTION *, * PUBLIC: u_int32_t, u_int8_t *, u_int32_t)); */ @@ -1311,15 +1455,511 @@ __repmgr_send_sync_msg(env, conn, type, buf, len) } /* + * Reads a whole message, when we expect to get a REPMGR_OWN_MSG. + */ +/* + * PUBLIC: int __repmgr_read_own_msg __P((ENV *, REPMGR_CONNECTION *, + * PUBLIC: u_int32_t *, u_int8_t **, size_t *)); + */ +int +__repmgr_read_own_msg(env, conn, typep, bufp, lenp) + ENV *env; + REPMGR_CONNECTION *conn; + u_int32_t *typep; + u_int8_t **bufp; + size_t *lenp; +{ + __repmgr_msg_hdr_args msg_hdr; + u_int8_t *buf; + u_int32_t type; + size_t size; + int ret; + + __repmgr_reset_for_reading(conn); + if ((ret = __repmgr_read_conn(conn)) != 0) + goto err; + ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr, + conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL); + DB_ASSERT(env, ret == 0); + + if ((conn->msg_type = msg_hdr.type) != REPMGR_OWN_MSG) { + ret = DB_REP_UNAVAIL; /* Protocol violation. */ + goto err; + } + type = REPMGR_OWN_MSG_TYPE(msg_hdr); + if ((size = (size_t)REPMGR_OWN_BUF_SIZE(msg_hdr)) > 0) { + conn->reading_phase = DATA_PHASE; + __repmgr_iovec_init(&conn->iovecs); + + if ((ret = __os_malloc(env, size, &buf)) != 0) + goto err; + conn->input.rep_message = NULL; + + __repmgr_add_buffer(&conn->iovecs, buf, size); + if ((ret = __repmgr_read_conn(conn)) != 0) { + __os_free(env, buf); + goto err; + } + *bufp = buf; + } + + *typep = type; + *lenp = size; + +err: + return (ret); +} + +/* + * Returns TRUE if we are connected to the other site in a preferred + * master replication group, FALSE otherwise. + * + * PUBLIC: int __repmgr_prefmas_connected __P((ENV *)); + */ +int +__repmgr_prefmas_connected(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + REPMGR_SITE *other_site; + + db_rep = env->rep_handle; + + /* + * Preferred master mode only has 2 sites, so the other site is + * always EID 1. + */ + if (!IS_PREFMAS_MODE(env) || !IS_KNOWN_REMOTE_SITE(1)) + return (FALSE); + + other_site = SITE_FROM_EID(1); + if (other_site->state == SITE_CONNECTED) + return (TRUE); + + if ((conn = other_site->ref.conn.in) != NULL && + IS_READY_STATE(conn->state)) + return (TRUE); + if ((conn = other_site->ref.conn.out) != NULL && + IS_READY_STATE(conn->state)) + return (TRUE); + + return (FALSE); +} + +/* + * Used by a preferred master site to restart the remote temporary master + * site as a client. This is used to help guarantee that the preferred master + * site's transactions are never rolled back. + * + * PUBLIC: int __repmgr_restart_site_as_client __P((ENV *, int)); + */ +int +__repmgr_restart_site_as_client(env, eid) + ENV *env; + int eid; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + repmgr_netaddr_t addr; + u_int32_t type; + size_t len; + u_int8_t any_value, *response_buf; + int ret, t_ret; + + COMPQUIET(any_value, 0); + db_rep = env->rep_handle; + conn = NULL; + + if (!IS_PREFMAS_MODE(env)) + return (0); + + LOCK_MUTEX(db_rep->mutex); + addr = SITE_FROM_EID(eid)->net_addr; + UNLOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) + return (ret); + + /* + * No payload needed, but must send at least a dummy byte for the + * other side to recognize that a message has arrived. + */ + if ((ret = __repmgr_send_sync_msg(env, conn, + REPMGR_RESTART_CLIENT, VOID_STAR_CAST &any_value, 1)) != 0) + goto err; + + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &len)) != 0) + goto err; + if (type != REPMGR_PREFMAS_SUCCESS) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "restart_site_as_client got unexpected message type %d", + type)); + ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */ + } +err: + if (conn != NULL) { + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret != 0) + ret = t_ret; + if ((t_ret = __repmgr_destroy_conn(env, + conn)) != 0 && ret != 0) + ret = t_ret; + } + return (ret); +} + +/* + * Used by a preferred master site to make the remote temporary master + * site a readonly master. This is used to help preserve all temporary + * master transactions. + * + * PUBLIC: int __repmgr_make_site_readonly_master __P((ENV *, int, + * PUBLIC: u_int32_t *, DB_LSN *)); + */ +int +__repmgr_make_site_readonly_master(env, eid, gen, sync_lsnp) + ENV *env; + int eid; + u_int32_t *gen; + DB_LSN *sync_lsnp; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + repmgr_netaddr_t addr; + __repmgr_permlsn_args permlsn; + u_int32_t type; + size_t len; + u_int8_t any_value, *response_buf; + int ret, t_ret; + + COMPQUIET(any_value, 0); + db_rep = env->rep_handle; + conn = NULL; + response_buf = NULL; + *gen = 0; + ZERO_LSN(*sync_lsnp); + + if (!IS_PREFMAS_MODE(env)) + return (0); + + LOCK_MUTEX(db_rep->mutex); + addr = SITE_FROM_EID(eid)->net_addr; + UNLOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) + return (ret); + + /* + * No payload needed, but must send at least a dummy byte for the + * other side to recognize that a message has arrived. + */ + if ((ret = __repmgr_send_sync_msg(env, conn, + REPMGR_READONLY_MASTER, VOID_STAR_CAST &any_value, 1)) != 0) + goto err; + + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &len)) != 0) + goto err; + + if (type == REPMGR_READONLY_RESPONSE) { + if ((ret = __repmgr_permlsn_unmarshal(env, + &permlsn, response_buf, len, NULL)) != 0) + goto err; + *gen = permlsn.generation; + *sync_lsnp = permlsn.lsn; + } else { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "make_site_readonly_master got unexpected message type %d", + type)); + ret = DB_REP_UNAVAIL; /* Invalid response: protocol violation */ + } + +err: + if (conn != NULL) { + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret != 0) + ret = t_ret; + if ((t_ret = __repmgr_destroy_conn(env, + conn)) != 0 && ret != 0) + ret = t_ret; + } + if (response_buf != NULL) + __os_free(env, response_buf); + return (ret); +} + +/* + * Used by a preferred master site to perform the LSN history comparisons to + * determine whether there is are continuous or conflicting sets of + * transactions between this site and the remote temporary master. + * + * PUBLIC: int __repmgr_lsnhist_match __P((ENV *, + * PUBLIC: DB_THREAD_INFO *, int, int *)); + */ +int +__repmgr_lsnhist_match(env, ip, eid, match) + ENV *env; + DB_THREAD_INFO *ip; + int eid; + int *match; +{ + DB_REP *db_rep; + REP *rep; + __rep_lsn_hist_data_args my_lsnhist; + __repmgr_lsnhist_match_args remote_lsnhist; + u_int32_t my_gen; + int found_commit, ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + *match = FALSE; + my_gen = rep->gen; + found_commit = FALSE; + + if (!IS_PREFMAS_MODE(env)) + return (0); + + /* Get local LSN history information for comparison. */ + if ((ret = __rep_get_lsnhist_data(env, ip, my_gen, &my_lsnhist)) != 0) + return (ret); + + /* Get remote LSN history information for comparison. */ + ret = __repmgr_remote_lsnhist(env, eid, my_gen, &remote_lsnhist); + + /* + * If the current gen doesn't exist at the remote site, the match + * fails. + * + * If the remote LSN or timestamp at the current gen doesn't match + * ours, we probably had a whack-a-mole situation where each site + * as up and down in isolation one or more times and the match fails. + * + * If the remote LSN for the next generation is lower than this + * site's startup LSN and there are any commit operations between + * these LSNs, there are conflicting sets of transactions and the + * match fails. + */ + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match my_lsn [%lu][%lu] remote_lsn [%lu][%lu]", + (u_long)my_lsnhist.lsn.file, (u_long)my_lsnhist.lsn.offset, + (u_long)remote_lsnhist.lsn.file, + (u_long)remote_lsnhist.lsn.offset)); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match my_time %lu:%lu remote_time %lu:%lu", + (u_long)my_lsnhist.hist_sec, (u_long)my_lsnhist.hist_nsec, + (u_long)remote_lsnhist.hist_sec, (u_long)remote_lsnhist.hist_nsec)); + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match pminit_lsn [%lu][%lu] next_gen_lsn [%lu][%lu]", + (u_long)db_rep->prefmas_init_lsn.file, + (u_long)db_rep->prefmas_init_lsn.offset, + (u_long)remote_lsnhist.next_gen_lsn.file, + (u_long)remote_lsnhist.next_gen_lsn.offset)); + if (ret != DB_REP_UNAVAIL && + LOG_COMPARE(&my_lsnhist.lsn, &remote_lsnhist.lsn) == 0 && + my_lsnhist.hist_sec == remote_lsnhist.hist_sec && + my_lsnhist.hist_nsec == remote_lsnhist.hist_nsec) { + /* + * If the remote site doesn't yet have the next gen or if + * our startup LSN is <= than the remote next gen LSN, we + * have a match. + * + * Otherwise, our startup LSN is higher than the remote + * next gen LSN. If we have any commit operations between + * these two LSNs, we have preferred master operations we + * must preserve and there is not a match. But if we just + * have uncommitted operations between these LSNs it doesn't + * matter if they are rolled back, so we call it a match and + * try to retain temporary master transactions if possible. + */ + if (IS_ZERO_LSN(remote_lsnhist.next_gen_lsn) || + LOG_COMPARE(&db_rep->prefmas_init_lsn, + &remote_lsnhist.next_gen_lsn) <= 0) + *match = TRUE; + else if ((ret = __repmgr_find_commit(env, + &remote_lsnhist.next_gen_lsn, + &db_rep->prefmas_init_lsn, &found_commit)) == 0 && + !found_commit) { + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match !found_commit set match TRUE")); + *match = TRUE; + } + } + + /* Don't return an error if current gen didn't exist at remote site. */ + if (ret == DB_REP_UNAVAIL) + ret = 0; + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "lsnhist_match match %d returning %d", *match, ret)); + return (ret); +} + +/* + * Checks a range of log records from low_lsn to high_lsn for any + * commit operations. Sets found_commit to TRUE if a commit is + * found. + */ +static int +__repmgr_find_commit(env, low_lsn, high_lsn, found_commit) + ENV *env; + DB_LSN *low_lsn; + DB_LSN *high_lsn; + int *found_commit; +{ + DB_LOGC *logc; + DB_LSN lsn; + DBT rec; + __txn_regop_args *txn_args; + u_int32_t rectype; + int ret, t_ret; + + *found_commit = FALSE; + ret = 0; + + lsn = *low_lsn; + if ((ret = __log_cursor(env, &logc)) != 0) + return (ret); + memset(&rec, 0, sizeof(rec)); + if (__logc_get(logc, &lsn, &rec, DB_SET) == 0) { + do { + LOGCOPY_32(env, &rectype, rec.data); + if (rectype == DB___txn_regop) { + if ((ret = __txn_regop_read( + env, rec.data, &txn_args)) != 0) + goto close_cursor; + if (txn_args->opcode == TXN_COMMIT) { + *found_commit = TRUE; + __os_free(env, txn_args); + break; + } + __os_free(env, txn_args); + } + } while ((ret = __logc_get(logc, &lsn, &rec, DB_NEXT)) == 0 && + LOG_COMPARE(&lsn, high_lsn) <= 0); + } +close_cursor: + if ((t_ret = __logc_close(logc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* + * Used by a preferred master site to get remote LSN history information + * from the other site in the replication group. + */ +static int +__repmgr_remote_lsnhist(env, eid, gen, lsnhist_match) + ENV *env; + int eid; + u_int32_t gen; + __repmgr_lsnhist_match_args *lsnhist_match; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + repmgr_netaddr_t addr; + __rep_lsn_hist_key_args lsnhist_key; + u_int8_t lsnhist_key_buf[__REP_LSN_HIST_KEY_SIZE]; + u_int32_t type; + size_t len; + u_int8_t *response_buf; + int ret, t_ret; + + db_rep = env->rep_handle; + conn = NULL; + response_buf = NULL; + + if (!IS_KNOWN_REMOTE_SITE(eid)) + return (0); + + LOCK_MUTEX(db_rep->mutex); + addr = SITE_FROM_EID(eid)->net_addr; + UNLOCK_MUTEX(db_rep->mutex); + if ((ret = __repmgr_make_request_conn(env, &addr, &conn)) != 0) + return (ret); + + /* Marshal generation for which to request remote lsnhist data. */ + lsnhist_key.version = REP_LSN_HISTORY_FMT_VERSION; + lsnhist_key.gen = gen; + __rep_lsn_hist_key_marshal(env, &lsnhist_key, lsnhist_key_buf); + if ((ret = __repmgr_send_sync_msg(env, conn, REPMGR_LSNHIST_REQUEST, + lsnhist_key_buf, sizeof(lsnhist_key_buf))) != 0) + goto err; + + if ((ret = __repmgr_read_own_msg(env, + conn, &type, &response_buf, &len)) != 0) + goto err; + + /* Unmarshal remote lsnhist time and LSNs for comparison. */ + if (type == REPMGR_LSNHIST_RESPONSE) { + if ((ret = __repmgr_lsnhist_match_unmarshal(env, lsnhist_match, + response_buf, __REPMGR_LSNHIST_MATCH_SIZE, NULL)) != 0) + goto err; + } else { + /* + * If the other site sent back REPMGR_PREFMAS_FAILURE, it means + * no lsnhist record for the requested gen was found on other + * site. + */ + if (type != REPMGR_PREFMAS_FAILURE) + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "remote_lsnhist got unexpected message type %d", + type)); + ret = DB_REP_UNAVAIL; + } + +err: + if (conn != NULL) { + if ((t_ret = __repmgr_close_connection(env, + conn)) != 0 && ret != 0) + ret = t_ret; + if ((t_ret = __repmgr_destroy_conn(env, + conn)) != 0 && ret != 0) + ret = t_ret; + } + if (response_buf != NULL) + __os_free(env, response_buf); + return (ret); +} + +/* + * Returns the number of tries and the amount of time to yield the + * processor for preferred master waits. The total wait is the larger + * of 2 seconds or 3 * ack_timeout. + * + * PUBLIC: int __repmgr_prefmas_get_wait __P((ENV *, u_int32_t *, u_long *)); + */ +int +__repmgr_prefmas_get_wait(env, tries, yield_usecs) + ENV *env; + u_int32_t *tries; + u_long *yield_usecs; +{ + DB_REP *db_rep; + REP *rep; + db_timeout_t max_wait; + + db_rep = env->rep_handle; + rep = db_rep->region; + + *yield_usecs = 250000; + max_wait = DB_REPMGR_DEFAULT_ACK_TIMEOUT * 2; + if ((rep->ack_timeout * 3) > max_wait) + max_wait = rep->ack_timeout * 3; + *tries = max_wait / (u_int32_t)*yield_usecs; + return (0); +} + +/* * Produce a membership list from the known info currently in memory. * - * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int8_t **, size_t *)); + * PUBLIC: int __repmgr_marshal_member_list __P((ENV *, u_int32_t, + * PUBLIC: u_int8_t **, size_t *)); * * Caller must hold mutex. */ int -__repmgr_marshal_member_list(env, bufp, lenp) +__repmgr_marshal_member_list(env, msg_version, bufp, lenp) ENV *env; + u_int32_t msg_version; u_int8_t **bufp; size_t *lenp; { @@ -1328,6 +1968,7 @@ __repmgr_marshal_member_list(env, bufp, lenp) REPMGR_SITE *site; __repmgr_membr_vers_args membr_vers; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; u_int8_t *buf, *p; size_t bufsize, len; u_int i; @@ -1353,14 +1994,24 @@ __repmgr_marshal_member_list(env, bufp, lenp) if (site->membership == 0) continue; - site_info.host.data = site->net_addr.host; - site_info.host.size = - (u_int32_t)strlen(site->net_addr.host) + 1; - site_info.port = site->net_addr.port; - site_info.flags = site->membership; - - ret = __repmgr_site_info_marshal(env, - &site_info, p, (size_t)(&buf[bufsize]-p), &len); + if (msg_version < 5) { + v4site_info.host.data = site->net_addr.host; + v4site_info.host.size = + (u_int32_t)strlen(site->net_addr.host) + 1; + v4site_info.port = site->net_addr.port; + v4site_info.flags = site->membership; + ret = __repmgr_v4site_info_marshal(env, + &v4site_info, p, (size_t)(&buf[bufsize]-p), &len); + } else { + site_info.host.data = site->net_addr.host; + site_info.host.size = + (u_int32_t)strlen(site->net_addr.host) + 1; + site_info.port = site->net_addr.port; + site_info.status = site->membership; + site_info.flags = site->gmdb_flags; + ret = __repmgr_site_info_marshal(env, + &site_info, p, (size_t)(&buf[bufsize]-p), &len); + } DB_ASSERT(env, ret == 0); p += len; } @@ -1387,7 +2038,7 @@ read_gmdb(env, ip, bufp, lenp) DBC *dbc; DBT key_dbt, data_dbt; __repmgr_membership_key_args key; - __repmgr_membership_data_args member_status; + __repmgr_membership_data_args member_data; __repmgr_member_metadata_args metadata; __repmgr_membr_vers_args membr_vers; __repmgr_site_info_args site_info; @@ -1435,8 +2086,13 @@ read_gmdb(env, ip, bufp, lenp) ret = __repmgr_member_metadata_unmarshal(env, &metadata, metadata_buf, data_dbt.size, NULL); DB_ASSERT(env, ret == 0); - DB_ASSERT(env, metadata.format == REPMGR_GMDB_FMT_VERSION); + DB_ASSERT(env, metadata.format >= REPMGR_GMDB_FMT_MIN_VERSION && + metadata.format <= REPMGR_GMDB_FMT_VERSION); DB_ASSERT(env, metadata.version > 0); + /* Automatic conversion of old format gmdb if needed. */ + if (metadata.format < REPMGR_GMDB_FMT_VERSION && + (ret = convert_gmdb(env, ip, dbp, txn)) != 0) + goto err; bufsize = 1000; /* Initial guess. */ if ((ret = __os_malloc(env, bufsize, &buf)) != 0) @@ -1459,13 +2115,14 @@ read_gmdb(env, ip, bufp, lenp) DB_ASSERT(env, key.port > 0); ret = __repmgr_membership_data_unmarshal(env, - &member_status, data_buf, data_dbt.size, NULL); + &member_data, data_buf, data_dbt.size, NULL); DB_ASSERT(env, ret == 0); - DB_ASSERT(env, member_status.flags != 0); + DB_ASSERT(env, member_data.status != 0); site_info.host = key.host; site_info.port = key.port; - site_info.flags = member_status.flags; + site_info.status = member_data.status; + site_info.flags = member_data.flags; if ((ret = __repmgr_site_info_marshal(env, &site_info, p, (size_t)(&buf[bufsize]-p), &len)) == ENOMEM) { bufsize *= 2; @@ -1501,28 +2158,129 @@ err: } /* + * Convert an older-format group membership database into the current format. + */ +static int +convert_gmdb(env, ip, dbp, txn) + ENV *env; + DB_THREAD_INFO *ip; + DB *dbp; + DB_TXN *txn; +{ + DBC *dbc; + DBT key_dbt, data_dbt, v4data_dbt; + __repmgr_membership_key_args key; + __repmgr_membership_data_args member_data; + __repmgr_v4membership_data_args v4member_data; + __repmgr_member_metadata_args metadata; + u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE]; + u_int8_t key_buf[MAX_MSG_BUF]; + u_int8_t metadata_buf[__REPMGR_MEMBER_METADATA_SIZE]; + u_int8_t v4data_buf[__REPMGR_V4MEMBERSHIP_DATA_SIZE]; + int ret, t_ret; + + dbc = NULL; + + if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0) + goto err; + + memset(&key_dbt, 0, sizeof(key_dbt)); + key_dbt.data = key_buf; + key_dbt.ulen = sizeof(key_buf); + F_SET(&key_dbt, DB_DBT_USERMEM); + memset(&data_dbt, 0, sizeof(data_dbt)); + data_dbt.data = metadata_buf; + data_dbt.ulen = sizeof(metadata_buf); + F_SET(&data_dbt, DB_DBT_USERMEM); + memset(&v4data_dbt, 0, sizeof(v4data_dbt)); + v4data_dbt.data = v4data_buf; + v4data_dbt.ulen = sizeof(v4data_buf); + F_SET(&v4data_dbt, DB_DBT_USERMEM); + + /* + * The first gmdb record is a special metadata record that contains + * an empty key and gmdb metadata (format and version) and has already + * been validated by the caller. We need to update its format value + * for this conversion but leave the version alone. + */ + if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) != 0) + goto err; + ret = __repmgr_membership_key_unmarshal(env, + &key, key_buf, key_dbt.size, NULL); + DB_ASSERT(env, ret == 0); + DB_ASSERT(env, key.host.size == 0); + DB_ASSERT(env, key.port == 0); + ret = __repmgr_member_metadata_unmarshal(env, + &metadata, metadata_buf, data_dbt.size, NULL); + DB_ASSERT(env, ret == 0); + DB_ASSERT(env, metadata.version > 0); + metadata.format = REPMGR_GMDB_FMT_VERSION; + __repmgr_member_metadata_marshal(env, &metadata, metadata_buf); + DB_INIT_DBT(data_dbt, metadata_buf, __REPMGR_MEMBER_METADATA_SIZE); + if ((ret = __dbc_put(dbc, &key_dbt, &data_dbt, DB_CURRENT)) != 0) + goto err; + + /* + * The rest of the gmdb records contain a key (host and port) and + * membership data (status and now flags). But the old format was + * using flags for the status value, so we need to transfer the + * old flags value to status and provide an empty flags value for + * this conversion. + */ + data_dbt.data = data_buf; + data_dbt.ulen = sizeof(data_buf); + while ((ret = __dbc_get(dbc, &key_dbt, &v4data_dbt, DB_NEXT)) == 0) { + /* Get membership data in old format. */ + ret = __repmgr_v4membership_data_unmarshal(env, + &v4member_data, v4data_buf, v4data_dbt.size, NULL); + DB_ASSERT(env, ret == 0); + DB_ASSERT(env, v4member_data.flags != 0); + + /* Convert membership data into current format and update. */ + member_data.status = v4member_data.flags; + member_data.flags = 0; + __repmgr_membership_data_marshal(env, &member_data, data_buf); + DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE); + if ((ret = __dbc_put(dbc, + &key_dbt, &data_dbt, DB_CURRENT)) != 0) + goto err; + } + if (ret == DB_NOTFOUND) + ret = 0; + +err: + if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* * Refresh our sites array from the given membership list. * * PUBLIC: int __repmgr_refresh_membership __P((ENV *, - * PUBLIC: u_int8_t *, size_t)); + * PUBLIC: u_int8_t *, size_t, u_int32_t)); */ int -__repmgr_refresh_membership(env, buf, len) +__repmgr_refresh_membership(env, buf, len, version) ENV *env; u_int8_t *buf; size_t len; + u_int32_t version; { DB_REP *db_rep; + REP *rep; REPMGR_SITE *site; __repmgr_membr_vers_args membr_vers; __repmgr_site_info_args site_info; + __repmgr_v4site_info_args v4site_info; char *host; u_int8_t *p; u_int16_t port; - u_int32_t i, n; + u_int32_t i, participants; int eid, ret; db_rep = env->rep_handle; + rep = db_rep->region; /* * Membership list consists of membr_vers followed by a number of @@ -1546,9 +2304,17 @@ __repmgr_refresh_membership(env, buf, len) for (i = 0; i < db_rep->site_cnt; i++) F_CLR(SITE_FROM_EID(i), SITE_TOUCHED); - for (n = 0; p < &buf[len]; ++n) { - ret = __repmgr_site_info_unmarshal(env, - &site_info, p, (size_t)(&buf[len] - p), &p); + for (participants = 0; p < &buf[len]; ) { + if (version < 5) { + ret = __repmgr_v4site_info_unmarshal(env, + &v4site_info, p, (size_t)(&buf[len] - p), &p); + site_info.host = v4site_info.host; + site_info.port = v4site_info.port; + site_info.status = v4site_info.flags; + site_info.flags = 0; + } else + ret = __repmgr_site_info_unmarshal(env, + &site_info, p, (size_t)(&buf[len] - p), &p); DB_ASSERT(env, ret == 0); host = site_info.host.data; @@ -1556,9 +2322,11 @@ __repmgr_refresh_membership(env, buf, len) (u_int8_t*)site_info.host.data + site_info.host.size <= p); host[site_info.host.size-1] = '\0'; port = site_info.port; + if (!FLD_ISSET(site_info.flags, SITE_VIEW)) + participants++; if ((ret = __repmgr_set_membership(env, - host, port, site_info.flags)) != 0) + host, port, site_info.status, site_info.flags)) != 0) goto err; if ((ret = __repmgr_find_site(env, host, port, &eid)) != 0) @@ -1566,8 +2334,13 @@ __repmgr_refresh_membership(env, buf, len) DB_ASSERT(env, IS_VALID_EID(eid)); F_SET(SITE_FROM_EID(eid), SITE_TOUCHED); } - ret = __rep_set_nsites_int(env, n); + ret = __rep_set_nsites_int(env, participants); DB_ASSERT(env, ret == 0); + if (FLD_ISSET(rep->config, + REP_C_PREFMAS_MASTER | REP_C_PREFMAS_CLIENT) && + rep->config_nsites > 2) + __db_errx(env, DB_STR("3703", + "More than two sites in preferred master replication group")); /* Scan "touched" flags so as to notice sites that have been removed. */ for (i = 0; i < db_rep->site_cnt; i++) { @@ -1576,7 +2349,8 @@ __repmgr_refresh_membership(env, buf, len) continue; host = site->net_addr.host; port = site->net_addr.port; - if ((ret = __repmgr_set_membership(env, host, port, 0)) != 0) + if ((ret = __repmgr_set_membership(env, host, port, + 0, site->gmdb_flags)) != 0) goto err; } @@ -1597,13 +2371,13 @@ __repmgr_reload_gmdb(env) size_t len; int ret; - ENV_ENTER(env, ip); + ENV_GET_THREAD_INFO(env, ip); if ((ret = read_gmdb(env, ip, &buf, &len)) == 0) { env->rep_handle->have_gmdb = TRUE; - ret = __repmgr_refresh_membership(env, buf, len); + ret = __repmgr_refresh_membership(env, buf, len, + DB_REPMGR_VERSION); __os_free(env, buf); } - ENV_LEAVE(env, ip); return (ret); } @@ -1650,7 +2424,8 @@ __repmgr_init_save(env, dbt) dbt->data = NULL; dbt->size = 0; ret = 0; - } else if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) == 0) { + } else if ((ret = __repmgr_marshal_member_list(env, + DB_REPMGR_VERSION, &buf, &len)) == 0) { dbt->data = buf; dbt->size = (u_int32_t)len; } @@ -1700,6 +2475,7 @@ __repmgr_defer_op(env, op) */ if ((ret = __os_calloc(env, 1, sizeof(*msg), &msg)) != 0) return (ret); + msg->size = sizeof(*msg); msg->msg_hdr.type = REPMGR_OWN_MSG; REPMGR_OWN_MSG_TYPE(msg->msg_hdr) = op; ret = __repmgr_queue_put(env, msg); @@ -1771,7 +2547,7 @@ __repmgr_become_client(env) if ((ret = __repmgr_await_gmdbop(env)) == 0) db_rep->client_intent = TRUE; UNLOCK_MUTEX(db_rep->mutex); - return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT) : ret); + return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT, 0) : ret); } /* @@ -1897,16 +2673,17 @@ get_eid(env, host, port, eidp) * accordingly. * * PUBLIC: int __repmgr_set_membership __P((ENV *, - * PUBLIC: const char *, u_int, u_int32_t)); + * PUBLIC: const char *, u_int, u_int32_t, u_int32_t)); * * Caller must host db_rep mutex, and be in ENV_ENTER context. */ int -__repmgr_set_membership(env, host, port, status) +__repmgr_set_membership(env, host, port, status, flags) ENV *env; const char *host; u_int port; u_int32_t status; + u_int32_t flags; { DB_REP *db_rep; REP *rep; @@ -1953,7 +2730,9 @@ __repmgr_set_membership(env, host, port, status) /* Set both private and shared copies of the info. */ site->membership = status; + site->gmdb_flags = flags; sites[eid].status = status; + sites[eid].flags = flags; } MUTEX_UNLOCK(env, rep->mtx_repmgr); @@ -1965,7 +2744,8 @@ __repmgr_set_membership(env, host, port, status) SELECTOR_RUNNING(db_rep)) { if (eid == db_rep->self_eid && status != SITE_PRESENT) - ret = DB_DELETED; + ret = (status == SITE_ADDING) ? + __repmgr_defer_op(env, REPMGR_REJOIN) : DB_DELETED; else if (orig != SITE_PRESENT && status == SITE_PRESENT && site->state == SITE_IDLE) { /* @@ -1981,10 +2761,11 @@ __repmgr_set_membership(env, host, port, status) * failure shouldn't hurt anything, because we'll just * naturally try again later. */ - ret = __repmgr_schedule_connection_attempt(env, - eid, TRUE); - if (eid != db_rep->self_eid) + if (eid != db_rep->self_eid) { + ret = __repmgr_schedule_connection_attempt(env, + eid, TRUE); DB_EVENT(env, DB_EVENT_REP_SITE_ADDED, &eid); + } } else if (orig != 0 && status == 0) DB_EVENT(env, DB_EVENT_REP_SITE_REMOVED, &eid); @@ -2084,3 +2865,73 @@ __repmgr_bcast_own_msg(env, type, buf, len) } return (0); } + +/* + * PUBLIC: int __repmgr_bcast_member_list __P((ENV *)); + * + * Broadcast membership list to all other sites in the replication group. + * + * Caller must hold mutex. + */ +int +__repmgr_bcast_member_list(env) + ENV *env; +{ + DB_REP *db_rep; + REPMGR_CONNECTION *conn; + REPMGR_SITE *site; + u_int8_t *buf, *v4buf; + size_t len, v4len; + int ret; + u_int i; + + db_rep = env->rep_handle; + if (!SELECTOR_RUNNING(db_rep)) + return (0); + buf = NULL; + v4buf = NULL; + LOCK_MUTEX(db_rep->mutex); + /* + * Some of the other sites in the replication group might be at + * an older version, so we need to be able to send the membership + * list in the current or older format. + */ + if ((ret = __repmgr_marshal_member_list(env, + DB_REPMGR_VERSION, &buf, &len)) != 0 || + (ret = __repmgr_marshal_member_list(env, + 4, &v4buf, &v4len)) != 0) { + UNLOCK_MUTEX(db_rep->mutex); + goto out; + } + UNLOCK_MUTEX(db_rep->mutex); + + RPRINT(env, (env, DB_VERB_REPMGR_MISC, + "Broadcast latest membership list")); + FOR_EACH_REMOTE_SITE_INDEX(i) { + site = SITE_FROM_EID(i); + if (site->state != SITE_CONNECTED) + continue; + if ((conn = site->ref.conn.in) != NULL && + conn->state == CONN_READY && + (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING, + (conn->version < 5 ? v4buf : buf), + (conn->version < 5 ? (u_int32_t) v4len : (u_int32_t)len))) + != 0 && + (ret = __repmgr_bust_connection(env, conn)) != 0) + goto out; + if ((conn = site->ref.conn.out) != NULL && + conn->state == CONN_READY && + (ret = __repmgr_send_own_msg(env, conn, REPMGR_SHARING, + (conn->version < 5 ? v4buf : buf), + (conn->version < 5 ? (u_int32_t)v4len : (u_int32_t)len))) + != 0 && + (ret = __repmgr_bust_connection(env, conn)) != 0) + goto out; + } +out: + if (buf != NULL) + __os_free(env, buf); + if (v4buf != NULL) + __os_free(env, v4buf); + return (ret); +} diff --git a/src/repmgr/repmgr_windows.c b/src/repmgr/repmgr_windows.c index d9c2a03d..8cf05960 100644 --- a/src/repmgr/repmgr_windows.c +++ b/src/repmgr/repmgr_windows.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2005, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2005, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -252,7 +252,7 @@ allocate_wait_slot(env, resultp, table) * the previous wait but before reacquiring the mutex, and this * extra signal would incorrectly cause the next wait to return * immediately. - */ + */ (void)WaitForSingleObject(w->event, 0); *resultp = i; return (0); @@ -639,31 +639,40 @@ __repmgr_select_loop(env) WSAEVENT listen_event; WSANETWORKEVENTS net_events; struct io_info io_info; - int i; + int accept_connect, i; db_rep = env->rep_handle; io_info.connections = connections; io_info.events = events; + accept_connect = FALSE; if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) { __db_err(env, net_errno, DB_STR("3590", "can't create event for listen socket")); return (net_errno); } - if (!IS_SUBORDINATE(db_rep) && - WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) == - SOCKET_ERROR) { - ret = net_errno; - __db_err(env, ret, DB_STR("3591", - "can't enable event for listener")); - (void)WSACloseEvent(listen_event); - goto out; - } LOCK_MUTEX(db_rep->mutex); if ((ret = __repmgr_first_try_connections(env)) != 0) goto unlock; for (;;) { + /* + * Set the event for this process to receive notification of + * incoming connections if this process is or has just taken + * over as the listener process. + */ + if (!IS_SUBORDINATE(db_rep) && !accept_connect) { + if (WSAEventSelect(db_rep->listen_fd, listen_event, + FD_ACCEPT) == SOCKET_ERROR) { + ret = net_errno; + __db_err(env, ret, DB_STR("3700", + "can't enable event for listener")); + (void)WSACloseEvent(listen_event); + goto out; + } + accept_connect = TRUE; + } + /* Start with the two events that we always wait for. */ #define SIGNALER_INDEX 0 #define LISTENER_INDEX 1 @@ -714,6 +723,8 @@ __repmgr_select_loop(env) ret = net_errno; goto unlock; } + if (net_events.lNetworkEvents == 0) + continue; DB_ASSERT(env, net_events.lNetworkEvents & FD_ACCEPT); if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT]) @@ -815,7 +826,16 @@ handle_completion(env, conn) /* Check both writing and reading. */ if (events.lNetworkEvents & FD_CLOSE) { error = events.iErrorCode[FD_CLOSE_BIT]; - goto report; + + /* + * There could be data for reading when we see FD_CLOSE, + * so we should try reading in this case. + */ + if (error != 0) + goto report; + else if ((ret = + __repmgr_read_from_site(env, conn)) != 0) + goto err; } if (events.lNetworkEvents & FD_WRITE) { @@ -823,7 +843,7 @@ handle_completion(env, conn) error = events.iErrorCode[FD_WRITE_BIT]; goto report; } else if ((ret = - __repmgr_write_some(env, conn)) != 0) + __repmgr_write_some(env, conn)) != 0) goto err; } @@ -832,7 +852,7 @@ handle_completion(env, conn) error = events.iErrorCode[FD_READ_BIT]; goto report; } else if ((ret = - __repmgr_read_from_site(env, conn)) != 0) + __repmgr_read_from_site(env, conn)) != 0) goto err; } |
