summaryrefslogtreecommitdiff
path: root/src/rep/rep_record.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rep/rep_record.c')
-rw-r--r--src/rep/rep_record.c315
1 files changed, 275 insertions, 40 deletions
diff --git a/src/rep/rep_record.c b/src/rep/rep_record.c
index f4691974..b206e60e 100644
--- a/src/rep/rep_record.c
+++ b/src/rep/rep_record.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -9,13 +9,17 @@
#include "db_config.h"
#include "db_int.h"
+#include "dbinc/blob.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/lock.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
-static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *));
+static int __rep_collect_txn
+ __P((ENV *, DB_LSN *, LSN_COLLECTION *, DELAYED_BLOB_LIST **));
+static int __rep_remove_delayed_blobs
+ __P((ENV *, db_seq_t, u_int32_t ,DELAYED_BLOB_LIST **));
static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *));
static int __rep_fire_newmaster __P((ENV *, u_int32_t, int));
static int __rep_fire_startupdone __P((ENV *, u_int32_t, int));
@@ -153,6 +157,7 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp)
DB_LSN *ret_lsnp;
{
ENV *env;
+ DB_THREAD_INFO *ip;
int ret;
env = dbenv->env;
@@ -193,7 +198,9 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp)
return (ret);
}
+ ENV_ENTER(env, ip);
ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp);
+ ENV_LEAVE(env, ip);
__dbt_userfree(env, control, rec, NULL);
return (ret);
@@ -289,8 +296,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
if (ret_lsnp != NULL)
ZERO_LSN(*ret_lsnp);
- ENV_ENTER(env, ip);
-
+ ENV_GET_THREAD_INFO(env, ip);
REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0);
/*
* Check the version number for both rep and log. If it is
@@ -303,8 +309,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
"%lu %d"), (u_long)rp->rep_version,
DB_REPVERSION_MIN);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
VPRINT(env, (env, DB_VERB_REP_MSGS,
"Received record %lu with old rep version %lu",
@@ -322,8 +327,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
__db_errx(env, DB_STR_A("3517",
"unexpected replication message version %lu, expected %d",
"%lu %d"), (u_long)rp->rep_version, DB_REPVERSION);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
if (rp->log_version < DB_LOGVERSION) {
@@ -332,8 +336,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
"unsupported old replication log version %lu, minimum version %d",
"%lu %d"), (u_long)rp->log_version,
DB_LOGVERSION_MIN);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
VPRINT(env, (env, DB_VERB_REP_MSGS,
"Received record %lu with old log version %lu",
@@ -342,8 +345,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
__db_errx(env, DB_STR_A("3519",
"unexpected log record version %lu, expected %d",
"%lu %d"), (u_long)rp->log_version, DB_LOGVERSION);
- ret = EINVAL;
- goto errlock;
+ return (EINVAL);
}
/*
@@ -465,9 +467,14 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
* accept the generation number and participate in future
* elections and communication. Otherwise, I need to hear about
* a new master and sync up.
+ *
+ * But do not do any of this if REP_F_HOLD_GEN is set. In
+ * this case we keep the site at its current gen until we
+ * clear this flag.
*/
- if (rp->rectype == REP_ALIVE ||
- rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
+ if ((rp->rectype == REP_ALIVE ||
+ rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) &&
+ !F_ISSET(rep, REP_F_HOLD_GEN)) {
REP_SYSTEM_LOCK(env);
RPRINT(env, (env, DB_VERB_REP_MSGS,
"Updating gen from %lu to %lu",
@@ -593,6 +600,38 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp)
ret = __rep_allreq(env, rp, eid);
CLIENT_REREQ;
break;
+ case REP_BLOB_ALL_REQ:
+ /* Blobs do not support peer-to-peer. */
+ RECOVERING_SKIP;
+ MASTER_ONLY(rep, rp);
+ ret = __rep_blob_allreq(env, eid, rec);
+ CLIENT_REREQ;
+ break;
+ case REP_BLOB_CHUNK:
+ /* Handle even if in recovery. */
+ CLIENT_ONLY(rep, rp);
+ ret = __rep_blob_chunk(env, eid, ip, rec);
+ if (ret == DB_REP_PAGEDONE)
+ ret = 0;
+ break;
+ case REP_BLOB_CHUNK_REQ:
+ /* Blobs do not support peer-to-peer. */
+ RECOVERING_SKIP;
+ MASTER_ONLY(rep, rp);
+ ret = __rep_blob_chunk_req(env, eid, rec);
+ CLIENT_REREQ;
+ break;
+ case REP_BLOB_UPDATE:
+ CLIENT_ONLY(rep, rp);
+ ret = __rep_blob_update(env, eid, ip, rec);
+ break;
+ case REP_BLOB_UPDATE_REQ:
+ MASTER_ONLY(rep, rp);
+ infop = env->reginfo;
+ renv = infop->primary;
+ MASTER_UPDATE(env, renv);
+ ret = __rep_blob_update_req(env, ip, rec);
+ break;
case REP_BULK_LOG:
RECOVERING_LOG_SKIP;
CLIENT_ONLY(rep, rp);
@@ -1059,8 +1098,6 @@ out:
*ret_lsnp = rp->lsn;
ret = DB_REP_NOTPERM;
}
- __dbt_userfree(env, control, rec, NULL);
- ENV_LEAVE(env, ip);
return (ret);
}
@@ -1290,8 +1327,24 @@ gap_check:
#endif
}
- if (ret == DB_KEYEXIST)
+ if (ret == DB_KEYEXIST) {
+ STAT(rep->stat.st_log_duplicated++);
+#ifdef CONFIG_TEST
+ STAT(rep->stat.st_log_futuredup++);
+#endif
+ if (is_dupp != NULL) {
+ *is_dupp = 1;
+ /*
+ * Could get overwritten by max_lsn later,
+ * but only when returning NOTPERM for a
+ * REPCTL_PERM record, in which case max_lsn
+ * is this log record.
+ */
+ if (ret_lsnp != NULL)
+ *ret_lsnp = lp->ready_lsn;
+ }
ret = 0;
+ }
if (ret != 0 && ret != ENOMEM)
goto done;
@@ -1337,10 +1390,11 @@ gap_check:
* But max_lsn is guaranteed <= ready_lsn, so
* it would be a more conservative LSN to return.
*/
- *ret_lsnp = lp->ready_lsn;
+ if (ret_lsnp != NULL)
+ *ret_lsnp = lp->ready_lsn;
}
LOGCOPY_32(env, &rectype, rec->data);
- if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
+ if (IS_PERM_RECTYPE(rectype))
max_lsn = lp->max_perm_lsn;
/*
* We check REPCTL_LEASE here, because this client may
@@ -1536,6 +1590,7 @@ __rep_process_txn(env, rec)
DB_REP *db_rep;
DB_THREAD_INFO *ip;
DB_TXNHEAD *txninfo;
+ DELAYED_BLOB_LIST *dblp, *dummy;
LSN_COLLECTION lc;
REP *rep;
__txn_regop_args *txn_args;
@@ -1548,12 +1603,12 @@ __rep_process_txn(env, rec)
db_rep = env->rep_handle;
rep = db_rep->region;
logc = NULL;
+ dblp = dummy = NULL;
txn_args = NULL;
txn42_args = NULL;
prep_args = NULL;
txninfo = NULL;
- ENV_ENTER(env, ip);
memset(&data_dbt, 0, sizeof(data_dbt));
if (F_ISSET(env, ENV_THREAD))
F_SET(&data_dbt, DB_DBT_REALLOC);
@@ -1618,8 +1673,19 @@ __rep_process_txn(env, rec)
goto err;
/* Phase 1. Get a list of the LSNs in this transaction, and sort it. */
- if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0)
+ if ((ret = __rep_collect_txn(env, &prev_lsn, &lc, &dblp)) != 0)
goto err;
+ /* Deal with any child transactions that had to be delayed. */
+ while (dblp != NULL) {
+ if ((ret = __rep_collect_txn(
+ env, &dblp->lsn, &lc, &dummy)) != 0)
+ goto err;
+ DB_ASSERT(env, dummy == NULL);
+ dummy = dblp;
+ dblp = dummy->next;
+ __os_free(env, dummy);
+ dummy = NULL;
+ }
qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
/*
@@ -1627,6 +1693,7 @@ __rep_process_txn(env, rec)
* records. Create a txnlist so that they can keep track of file
* state between records.
*/
+ ENV_GET_THREAD_INFO(env, ip);
if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0)
goto err;
@@ -1647,6 +1714,7 @@ __rep_process_txn(env, rec)
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
+ LOGCOPY_32(env, &rectype, data_dbt.data);
}
err: memset(&req, 0, sizeof(req));
@@ -1658,6 +1726,12 @@ err: memset(&req, 0, sizeof(req));
if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0)
ret = t_ret;
+ while (dblp != NULL) {
+ dummy = dblp;
+ dblp = dummy->next;
+ __os_free(env, dummy);
+ }
+
err1: if (txn_args != NULL)
__os_free(env, txn_args);
if (txn42_args != NULL)
@@ -1694,25 +1768,52 @@ err1: if (txn_args != NULL)
* the entire transaction family at once.
*/
static int
-__rep_collect_txn(env, lsnp, lc)
+__rep_collect_txn(env, lsnp, lc, dbl)
ENV *env;
DB_LSN *lsnp;
LSN_COLLECTION *lc;
+ DELAYED_BLOB_LIST **dbl;
{
+ __dbreg_register_args *dbregargp;
__txn_child_args *argp;
DB_LOGC *logc;
DB_LSN c_lsn;
+ DB_REP *db_rep;
DBT data;
- u_int32_t rectype;
+ db_seq_t blob_file_id;
+ u_int32_t child, rectype, skip_txnid;
u_int nalloc;
- int ret, t_ret;
+ int ret, t_ret, view_partial;
+ char *name;
memset(&data, 0, sizeof(data));
F_SET(&data, DB_DBT_REALLOC);
+ skip_txnid = TXN_INVALID;
if ((ret = __log_cursor(env, &logc)) != 0)
return (ret);
+ /*
+ * For partial replication we assume a certain sequence of
+ * log records to detect a database create and skip it if
+ * desired. We are walking backward through the records of
+ * a single transaction right now.
+ *
+ * A create operation is done inside a BDB-owned child txn.
+ * Nothing else is done within this BDB-owned child txn.
+ * The last piece of a create operations is the dbreg_register
+ * log record that records the opening of the file. That
+ * log record contains the child txnid in the 'id' field, and
+ * the file name. At this point we invoke the partial callback
+ * to determine if this database should be replicated. If it
+ * should not be replicated, we need to avoid collecting the
+ * entire child txn referenced in the 'id' field.
+ *
+ * So if processing the dbreg_register record finds a database
+ * to skip, we store the child txnid in 'skip_txnid'. We use
+ * 'skip_txnid' to avoid processing log records or making
+ * recursive calls for that txnid.
+ */
while (!IS_ZERO_LSN(*lsnp) &&
(ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) {
LOGCOPY_32(env, &rectype, data.data);
@@ -1722,9 +1823,66 @@ __rep_collect_txn(env, lsnp, lc)
goto err;
c_lsn = argp->c_lsn;
*lsnp = argp->prev_lsn;
+ child = argp->child;
__os_free(env, argp);
- ret = __rep_collect_txn(env, &c_lsn, lc);
- } else {
+
+ if (child == skip_txnid && *dbl != NULL &&
+ (*dbl)->child == child)
+ (*dbl)->lsn = c_lsn;
+ /*
+ * If skip_txnid is set, it is the id of the child txnid
+ * that creates a database we should skip. So, if
+ * this is that child txn, do not collect it.
+ */
+ if (skip_txnid == TXN_INVALID || child != skip_txnid)
+ ret = __rep_collect_txn(env, &c_lsn, lc, dbl);
+ } else if (IS_VIEW_SITE(env) &&
+ rectype == DB___dbreg_register) {
+ db_rep = env->rep_handle;
+ /*
+ * If we are a view see if this is a file creation
+ * stream. On-disk files have the creating child txn
+ * in the 'id' field and the name. See if this view
+ * wants this file.
+ */
+ if ((ret = __dbreg_register_read(
+ env, data.data, &dbregargp)) != 0)
+ goto err;
+ child = dbregargp->id;
+ name = (char *)dbregargp->name.data;
+ skip_txnid = TXN_INVALID;
+ if (child != TXN_INVALID &&
+ (!IS_DB_FILE(name) || IS_BLOB_META(name))) {
+ /*
+ * The 'id' has a child txn so it is a create.
+ */
+ DB_ASSERT(env, db_rep->partial != NULL);
+ GET_LO_HI(env, dbregargp->blob_fid_lo,
+ dbregargp->blob_fid_hi, blob_file_id, ret);
+ if (ret != 0)
+ goto err;
+ if ((ret = __rep_call_partial(env,
+ name, &view_partial, 0, dbl)) != 0) {
+ VPRINT(env, (env, DB_VERB_REP_MISC,
+ "rep_collect_txn: partial cb err %d for %s", ret, name));
+ __os_free(env, dbregargp);
+ goto err;
+ }
+ /*
+ * Save the child txnid for when we walk back
+ * into the txn_child record.
+ */
+ if (view_partial == 0) {
+ skip_txnid = child;
+ if ((ret =
+ __rep_remove_delayed_blobs(env,
+ blob_file_id, child, dbl)) != 0)
+ goto err;
+ }
+ }
+ __os_free(env, dbregargp);
+ }
+ if (rectype != DB___txn_child) {
if (lc->nalloc < lc->nlsns + 1) {
nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
if ((ret = __os_realloc(env,
@@ -1761,6 +1919,62 @@ err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
}
/*
+ * __rep_remove_delayed_blobs --
+ *
+ * If a blob meta database is opened in the same transaction as the database
+ * that owns it, then deciding whether it should be replicated or not needs
+ * to be delayed until after the rest of the transaction is processed. To do
+ * this, the transaction's information is added to a DELAYED_BLOB_LIST. When
+ * the owning database is processed, if it is not replicated then remove the
+ * entry of its blob meta database from the delayed list.
+ */
+static int
+__rep_remove_delayed_blobs(env, blob_file_id, child, dbl)
+ ENV *env;
+ db_seq_t blob_file_id;
+ u_int32_t child;
+ DELAYED_BLOB_LIST **dbl;
+{
+ DELAYED_BLOB_LIST *ent, *next, *prev;
+
+ if (*dbl == NULL)
+ return (0);
+
+ /*
+ * If the child transaction has not been set, then a new entry was just
+ * added to the list.
+ */
+ if ((*dbl)->child == 0) {
+ (*dbl)->child = child;
+ return (0);
+ }
+
+ if (blob_file_id == 0)
+ return (0);
+
+ /*
+ * This blob meta database should not be replicated if its associated
+ * database is not replicated. Remove it from the delayed
+ * list so it will not be processed at a later time.
+ */
+ for (ent = *dbl; ent != NULL; ent = (DELAYED_BLOB_LIST *)ent->next) {
+ if (ent->blob_file_id == blob_file_id && ent->child != child) {
+ next = (DELAYED_BLOB_LIST *)ent->next;
+ prev = (DELAYED_BLOB_LIST *)ent->prev;
+ if (ent == *dbl)
+ *dbl = next;
+ if (prev != NULL)
+ prev->next = ent->next;
+ if (next != NULL)
+ next->prev = ent->prev;
+ __os_free(env, ent);
+ break;
+ }
+ }
+ return (0);
+}
+
+/*
* __rep_lsn_cmp --
* qsort-type-compatible wrapper for LOG_COMPARE.
*/
@@ -2138,9 +2352,13 @@ __rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp)
ret = __rep_process_txn(env, rec);
} while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED);
- /* Now flush the log unless we're running TXN_NOSYNC. */
- if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
- ret = __log_flush(env, NULL);
+ /* Now write/flush the log as appropriate. */
+ if (ret == 0) {
+ if (F_ISSET(env->dbenv, DB_ENV_TXN_WRITE_NOSYNC))
+ ret = __log_rep_write(env);
+ else if (!F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC))
+ ret = __log_flush(env, NULL);
+ }
if (ret != 0) {
__db_errx(env, DB_STR_A("3526",
"Error processing txn [%lu][%lu]", "%lu %lu"),
@@ -2256,7 +2474,7 @@ __rep_resend_req(env, rereq)
DB_REP *db_rep;
LOG *lp;
REP *rep;
- int master, ret;
+ int blob_sync, master, ret;
repsync_t sync_state;
u_int32_t gapflags, msgtype, repflags, sendflags;
@@ -2271,6 +2489,7 @@ __rep_resend_req(env, rereq)
repflags = rep->flags;
sync_state = rep->sync_state;
+ blob_sync = rep->blob_sync;
/*
* If we are delayed we do not rerequest anything.
*/
@@ -2293,9 +2512,17 @@ __rep_resend_req(env, rereq)
*/
msgtype = REP_UPDATE_REQ;
} else if (sync_state == SYNC_PAGE) {
- REP_SYSTEM_LOCK(env);
- ret = __rep_pggap_req(env, rep, NULL, gapflags);
- REP_SYSTEM_UNLOCK(env);
+ if (blob_sync == 0) {
+ REP_SYSTEM_LOCK(env);
+ ret = __rep_pggap_req(env, rep, NULL, gapflags);
+ REP_SYSTEM_UNLOCK(env);
+ } else {
+ MUTEX_LOCK(env, rep->mtx_clientdb);
+ REP_SYSTEM_LOCK(env);
+ ret = __rep_blob_rereq(env, rep);
+ REP_SYSTEM_UNLOCK(env);
+ MUTEX_UNLOCK(env, rep->mtx_clientdb);
+ }
} else {
MUTEX_LOCK(env, rep->mtx_clientdb);
ret = __rep_loggap_req(env, rep, NULL, gapflags);
@@ -2397,9 +2624,20 @@ __rep_skip_msg(env, rep, eid, rectype)
if (rep->master_id == DB_EID_INVALID) /* Case 1. */
(void)__rep_send_message(env,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
- else if (eid == rep->master_id) /* Case 2. */
- ret = __rep_resend_req(env, 0);
- else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */
+ else if (eid == rep->master_id) { /* Case 2. */
+ /*
+ * When we receive log messages in the SYNC_PAGE stage
+ * and we decide to rerequest, it often means the pages
+ * we expect have been dropped. Send a rerequest with
+ * gapflags for better performance.
+ */
+ if ((rectype == REP_LOG || rectype == REP_BULK_LOG ||
+ rectype == REP_LOG_MORE) &&
+ rep->sync_state == SYNC_PAGE)
+ ret = __rep_resend_req(env, 1);
+ else
+ ret = __rep_resend_req(env, 0);
+ } else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */
(void)__rep_send_message(env,
eid, REP_REREQUEST, NULL, NULL, 0, 0);
}
@@ -2421,7 +2659,6 @@ __rep_check_missing(env, gen, master_perm_lsn)
DB_LOG *dblp;
DB_LSN *end_lsn;
DB_REP *db_rep;
- DB_THREAD_INFO *ip;
LOG *lp;
REGINFO *infop;
REP *rep;
@@ -2434,7 +2671,6 @@ __rep_check_missing(env, gen, master_perm_lsn)
infop = env->reginfo;
has_log_gap = has_page_gap = ret = 0;
- ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
/*
@@ -2518,8 +2754,7 @@ __rep_check_missing(env, gen, master_perm_lsn)
rep->msg_th--;
REP_SYSTEM_UNLOCK(env);
-out: ENV_LEAVE(env, ip);
- return (ret);
+out: return (ret);
}
static int