diff options
Diffstat (limited to 'src/rep/rep_record.c')
-rw-r--r-- | src/rep/rep_record.c | 315 |
1 files changed, 275 insertions, 40 deletions
diff --git a/src/rep/rep_record.c b/src/rep/rep_record.c index f4691974..b206e60e 100644 --- a/src/rep/rep_record.c +++ b/src/rep/rep_record.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -9,13 +9,17 @@ #include "db_config.h" #include "db_int.h" +#include "dbinc/blob.h" #include "dbinc/db_page.h" #include "dbinc/db_am.h" #include "dbinc/lock.h" #include "dbinc/mp.h" #include "dbinc/txn.h" -static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *)); +static int __rep_collect_txn + __P((ENV *, DB_LSN *, LSN_COLLECTION *, DELAYED_BLOB_LIST **)); +static int __rep_remove_delayed_blobs + __P((ENV *, db_seq_t, u_int32_t ,DELAYED_BLOB_LIST **)); static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *)); static int __rep_fire_newmaster __P((ENV *, u_int32_t, int)); static int __rep_fire_startupdone __P((ENV *, u_int32_t, int)); @@ -153,6 +157,7 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp) DB_LSN *ret_lsnp; { ENV *env; + DB_THREAD_INFO *ip; int ret; env = dbenv->env; @@ -193,7 +198,9 @@ __rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp) return (ret); } + ENV_ENTER(env, ip); ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp); + ENV_LEAVE(env, ip); __dbt_userfree(env, control, rec, NULL); return (ret); @@ -289,8 +296,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) if (ret_lsnp != NULL) ZERO_LSN(*ret_lsnp); - ENV_ENTER(env, ip); - + ENV_GET_THREAD_INFO(env, ip); REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0); /* * Check the version number for both rep and log. If it is @@ -303,8 +309,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) "%lu %d"), (u_long)rp->rep_version, DB_REPVERSION_MIN); - ret = EINVAL; - goto errlock; + return (EINVAL); } VPRINT(env, (env, DB_VERB_REP_MSGS, "Received record %lu with old rep version %lu", @@ -322,8 +327,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) __db_errx(env, DB_STR_A("3517", "unexpected replication message version %lu, expected %d", "%lu %d"), (u_long)rp->rep_version, DB_REPVERSION); - ret = EINVAL; - goto errlock; + return (EINVAL); } if (rp->log_version < DB_LOGVERSION) { @@ -332,8 +336,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) "unsupported old replication log version %lu, minimum version %d", "%lu %d"), (u_long)rp->log_version, DB_LOGVERSION_MIN); - ret = EINVAL; - goto errlock; + return (EINVAL); } VPRINT(env, (env, DB_VERB_REP_MSGS, "Received record %lu with old log version %lu", @@ -342,8 +345,7 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) __db_errx(env, DB_STR_A("3519", "unexpected log record version %lu, expected %d", "%lu %d"), (u_long)rp->log_version, DB_LOGVERSION); - ret = EINVAL; - goto errlock; + return (EINVAL); } /* @@ -465,9 +467,14 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) * accept the generation number and participate in future * elections and communication. Otherwise, I need to hear about * a new master and sync up. + * + * But do not do any of this if REP_F_HOLD_GEN is set. In + * this case we keep the site at its current gen until we + * clear this flag. */ - if (rp->rectype == REP_ALIVE || - rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) { + if ((rp->rectype == REP_ALIVE || + rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) && + !F_ISSET(rep, REP_F_HOLD_GEN)) { REP_SYSTEM_LOCK(env); RPRINT(env, (env, DB_VERB_REP_MSGS, "Updating gen from %lu to %lu", @@ -593,6 +600,38 @@ __rep_process_message_int(env, control, rec, eid, ret_lsnp) ret = __rep_allreq(env, rp, eid); CLIENT_REREQ; break; + case REP_BLOB_ALL_REQ: + /* Blobs do not support peer-to-peer. */ + RECOVERING_SKIP; + MASTER_ONLY(rep, rp); + ret = __rep_blob_allreq(env, eid, rec); + CLIENT_REREQ; + break; + case REP_BLOB_CHUNK: + /* Handle even if in recovery. */ + CLIENT_ONLY(rep, rp); + ret = __rep_blob_chunk(env, eid, ip, rec); + if (ret == DB_REP_PAGEDONE) + ret = 0; + break; + case REP_BLOB_CHUNK_REQ: + /* Blobs do not support peer-to-peer. */ + RECOVERING_SKIP; + MASTER_ONLY(rep, rp); + ret = __rep_blob_chunk_req(env, eid, rec); + CLIENT_REREQ; + break; + case REP_BLOB_UPDATE: + CLIENT_ONLY(rep, rp); + ret = __rep_blob_update(env, eid, ip, rec); + break; + case REP_BLOB_UPDATE_REQ: + MASTER_ONLY(rep, rp); + infop = env->reginfo; + renv = infop->primary; + MASTER_UPDATE(env, renv); + ret = __rep_blob_update_req(env, ip, rec); + break; case REP_BULK_LOG: RECOVERING_LOG_SKIP; CLIENT_ONLY(rep, rp); @@ -1059,8 +1098,6 @@ out: *ret_lsnp = rp->lsn; ret = DB_REP_NOTPERM; } - __dbt_userfree(env, control, rec, NULL); - ENV_LEAVE(env, ip); return (ret); } @@ -1290,8 +1327,24 @@ gap_check: #endif } - if (ret == DB_KEYEXIST) + if (ret == DB_KEYEXIST) { + STAT(rep->stat.st_log_duplicated++); +#ifdef CONFIG_TEST + STAT(rep->stat.st_log_futuredup++); +#endif + if (is_dupp != NULL) { + *is_dupp = 1; + /* + * Could get overwritten by max_lsn later, + * but only when returning NOTPERM for a + * REPCTL_PERM record, in which case max_lsn + * is this log record. + */ + if (ret_lsnp != NULL) + *ret_lsnp = lp->ready_lsn; + } ret = 0; + } if (ret != 0 && ret != ENOMEM) goto done; @@ -1337,10 +1390,11 @@ gap_check: * But max_lsn is guaranteed <= ready_lsn, so * it would be a more conservative LSN to return. */ - *ret_lsnp = lp->ready_lsn; + if (ret_lsnp != NULL) + *ret_lsnp = lp->ready_lsn; } LOGCOPY_32(env, &rectype, rec->data); - if (rectype == DB___txn_regop || rectype == DB___txn_ckp) + if (IS_PERM_RECTYPE(rectype)) max_lsn = lp->max_perm_lsn; /* * We check REPCTL_LEASE here, because this client may @@ -1536,6 +1590,7 @@ __rep_process_txn(env, rec) DB_REP *db_rep; DB_THREAD_INFO *ip; DB_TXNHEAD *txninfo; + DELAYED_BLOB_LIST *dblp, *dummy; LSN_COLLECTION lc; REP *rep; __txn_regop_args *txn_args; @@ -1548,12 +1603,12 @@ __rep_process_txn(env, rec) db_rep = env->rep_handle; rep = db_rep->region; logc = NULL; + dblp = dummy = NULL; txn_args = NULL; txn42_args = NULL; prep_args = NULL; txninfo = NULL; - ENV_ENTER(env, ip); memset(&data_dbt, 0, sizeof(data_dbt)); if (F_ISSET(env, ENV_THREAD)) F_SET(&data_dbt, DB_DBT_REALLOC); @@ -1618,8 +1673,19 @@ __rep_process_txn(env, rec) goto err; /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */ - if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0) + if ((ret = __rep_collect_txn(env, &prev_lsn, &lc, &dblp)) != 0) goto err; + /* Deal with any child transactions that had to be delayed. */ + while (dblp != NULL) { + if ((ret = __rep_collect_txn( + env, &dblp->lsn, &lc, &dummy)) != 0) + goto err; + DB_ASSERT(env, dummy == NULL); + dummy = dblp; + dblp = dummy->next; + __os_free(env, dummy); + dummy = NULL; + } qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp); /* @@ -1627,6 +1693,7 @@ __rep_process_txn(env, rec) * records. Create a txnlist so that they can keep track of file * state between records. */ + ENV_GET_THREAD_INFO(env, ip); if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0) goto err; @@ -1647,6 +1714,7 @@ __rep_process_txn(env, rec) (u_long)lsnp->file, (u_long)lsnp->offset); goto err; } + LOGCOPY_32(env, &rectype, data_dbt.data); } err: memset(&req, 0, sizeof(req)); @@ -1658,6 +1726,12 @@ err: memset(&req, 0, sizeof(req)); if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0) ret = t_ret; + while (dblp != NULL) { + dummy = dblp; + dblp = dummy->next; + __os_free(env, dummy); + } + err1: if (txn_args != NULL) __os_free(env, txn_args); if (txn42_args != NULL) @@ -1694,25 +1768,52 @@ err1: if (txn_args != NULL) * the entire transaction family at once. */ static int -__rep_collect_txn(env, lsnp, lc) +__rep_collect_txn(env, lsnp, lc, dbl) ENV *env; DB_LSN *lsnp; LSN_COLLECTION *lc; + DELAYED_BLOB_LIST **dbl; { + __dbreg_register_args *dbregargp; __txn_child_args *argp; DB_LOGC *logc; DB_LSN c_lsn; + DB_REP *db_rep; DBT data; - u_int32_t rectype; + db_seq_t blob_file_id; + u_int32_t child, rectype, skip_txnid; u_int nalloc; - int ret, t_ret; + int ret, t_ret, view_partial; + char *name; memset(&data, 0, sizeof(data)); F_SET(&data, DB_DBT_REALLOC); + skip_txnid = TXN_INVALID; if ((ret = __log_cursor(env, &logc)) != 0) return (ret); + /* + * For partial replication we assume a certain sequence of + * log records to detect a database create and skip it if + * desired. We are walking backward through the records of + * a single transaction right now. + * + * A create operation is done inside a BDB-owned child txn. + * Nothing else is done within this BDB-owned child txn. + * The last piece of a create operations is the dbreg_register + * log record that records the opening of the file. That + * log record contains the child txnid in the 'id' field, and + * the file name. At this point we invoke the partial callback + * to determine if this database should be replicated. If it + * should not be replicated, we need to avoid collecting the + * entire child txn referenced in the 'id' field. + * + * So if processing the dbreg_register record finds a database + * to skip, we store the child txnid in 'skip_txnid'. We use + * 'skip_txnid' to avoid processing log records or making + * recursive calls for that txnid. + */ while (!IS_ZERO_LSN(*lsnp) && (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) { LOGCOPY_32(env, &rectype, data.data); @@ -1722,9 +1823,66 @@ __rep_collect_txn(env, lsnp, lc) goto err; c_lsn = argp->c_lsn; *lsnp = argp->prev_lsn; + child = argp->child; __os_free(env, argp); - ret = __rep_collect_txn(env, &c_lsn, lc); - } else { + + if (child == skip_txnid && *dbl != NULL && + (*dbl)->child == child) + (*dbl)->lsn = c_lsn; + /* + * If skip_txnid is set, it is the id of the child txnid + * that creates a database we should skip. So, if + * this is that child txn, do not collect it. + */ + if (skip_txnid == TXN_INVALID || child != skip_txnid) + ret = __rep_collect_txn(env, &c_lsn, lc, dbl); + } else if (IS_VIEW_SITE(env) && + rectype == DB___dbreg_register) { + db_rep = env->rep_handle; + /* + * If we are a view see if this is a file creation + * stream. On-disk files have the creating child txn + * in the 'id' field and the name. See if this view + * wants this file. + */ + if ((ret = __dbreg_register_read( + env, data.data, &dbregargp)) != 0) + goto err; + child = dbregargp->id; + name = (char *)dbregargp->name.data; + skip_txnid = TXN_INVALID; + if (child != TXN_INVALID && + (!IS_DB_FILE(name) || IS_BLOB_META(name))) { + /* + * The 'id' has a child txn so it is a create. + */ + DB_ASSERT(env, db_rep->partial != NULL); + GET_LO_HI(env, dbregargp->blob_fid_lo, + dbregargp->blob_fid_hi, blob_file_id, ret); + if (ret != 0) + goto err; + if ((ret = __rep_call_partial(env, + name, &view_partial, 0, dbl)) != 0) { + VPRINT(env, (env, DB_VERB_REP_MISC, + "rep_collect_txn: partial cb err %d for %s", ret, name)); + __os_free(env, dbregargp); + goto err; + } + /* + * Save the child txnid for when we walk back + * into the txn_child record. + */ + if (view_partial == 0) { + skip_txnid = child; + if ((ret = + __rep_remove_delayed_blobs(env, + blob_file_id, child, dbl)) != 0) + goto err; + } + } + __os_free(env, dbregargp); + } + if (rectype != DB___txn_child) { if (lc->nalloc < lc->nlsns + 1) { nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2; if ((ret = __os_realloc(env, @@ -1761,6 +1919,62 @@ err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0) } /* + * __rep_remove_delayed_blobs -- + * + * If a blob meta database is opened in the same transaction as the database + * that owns it, then deciding whether it should be replicated or not needs + * to be delayed until after the rest of the transaction is processed. To do + * this, the transaction's information is added to a DELAYED_BLOB_LIST. When + * the owning database is processed, if it is not replicated then remove the + * entry of its blob meta database from the delayed list. + */ +static int +__rep_remove_delayed_blobs(env, blob_file_id, child, dbl) + ENV *env; + db_seq_t blob_file_id; + u_int32_t child; + DELAYED_BLOB_LIST **dbl; +{ + DELAYED_BLOB_LIST *ent, *next, *prev; + + if (*dbl == NULL) + return (0); + + /* + * If the child transaction has not been set, then a new entry was just + * added to the list. + */ + if ((*dbl)->child == 0) { + (*dbl)->child = child; + return (0); + } + + if (blob_file_id == 0) + return (0); + + /* + * This blob meta database should not be replicated if its associated + * database is not replicated. Remove it from the delayed + * list so it will not be processed at a later time. + */ + for (ent = *dbl; ent != NULL; ent = (DELAYED_BLOB_LIST *)ent->next) { + if (ent->blob_file_id == blob_file_id && ent->child != child) { + next = (DELAYED_BLOB_LIST *)ent->next; + prev = (DELAYED_BLOB_LIST *)ent->prev; + if (ent == *dbl) + *dbl = next; + if (prev != NULL) + prev->next = ent->next; + if (next != NULL) + next->prev = ent->prev; + __os_free(env, ent); + break; + } + } + return (0); +} + +/* * __rep_lsn_cmp -- * qsort-type-compatible wrapper for LOG_COMPARE. */ @@ -2138,9 +2352,13 @@ __rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp) ret = __rep_process_txn(env, rec); } while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED); - /* Now flush the log unless we're running TXN_NOSYNC. */ - if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC)) - ret = __log_flush(env, NULL); + /* Now write/flush the log as appropriate. */ + if (ret == 0) { + if (F_ISSET(env->dbenv, DB_ENV_TXN_WRITE_NOSYNC)) + ret = __log_rep_write(env); + else if (!F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC)) + ret = __log_flush(env, NULL); + } if (ret != 0) { __db_errx(env, DB_STR_A("3526", "Error processing txn [%lu][%lu]", "%lu %lu"), @@ -2256,7 +2474,7 @@ __rep_resend_req(env, rereq) DB_REP *db_rep; LOG *lp; REP *rep; - int master, ret; + int blob_sync, master, ret; repsync_t sync_state; u_int32_t gapflags, msgtype, repflags, sendflags; @@ -2271,6 +2489,7 @@ __rep_resend_req(env, rereq) repflags = rep->flags; sync_state = rep->sync_state; + blob_sync = rep->blob_sync; /* * If we are delayed we do not rerequest anything. */ @@ -2293,9 +2512,17 @@ __rep_resend_req(env, rereq) */ msgtype = REP_UPDATE_REQ; } else if (sync_state == SYNC_PAGE) { - REP_SYSTEM_LOCK(env); - ret = __rep_pggap_req(env, rep, NULL, gapflags); - REP_SYSTEM_UNLOCK(env); + if (blob_sync == 0) { + REP_SYSTEM_LOCK(env); + ret = __rep_pggap_req(env, rep, NULL, gapflags); + REP_SYSTEM_UNLOCK(env); + } else { + MUTEX_LOCK(env, rep->mtx_clientdb); + REP_SYSTEM_LOCK(env); + ret = __rep_blob_rereq(env, rep); + REP_SYSTEM_UNLOCK(env); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + } } else { MUTEX_LOCK(env, rep->mtx_clientdb); ret = __rep_loggap_req(env, rep, NULL, gapflags); @@ -2397,9 +2624,20 @@ __rep_skip_msg(env, rep, eid, rectype) if (rep->master_id == DB_EID_INVALID) /* Case 1. */ (void)__rep_send_message(env, DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); - else if (eid == rep->master_id) /* Case 2. */ - ret = __rep_resend_req(env, 0); - else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */ + else if (eid == rep->master_id) { /* Case 2. */ + /* + * When we receive log messages in the SYNC_PAGE stage + * and we decide to rerequest, it often means the pages + * we expect have been dropped. Send a rerequest with + * gapflags for better performance. + */ + if ((rectype == REP_LOG || rectype == REP_BULK_LOG || + rectype == REP_LOG_MORE) && + rep->sync_state == SYNC_PAGE) + ret = __rep_resend_req(env, 1); + else + ret = __rep_resend_req(env, 0); + } else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */ (void)__rep_send_message(env, eid, REP_REREQUEST, NULL, NULL, 0, 0); } @@ -2421,7 +2659,6 @@ __rep_check_missing(env, gen, master_perm_lsn) DB_LOG *dblp; DB_LSN *end_lsn; DB_REP *db_rep; - DB_THREAD_INFO *ip; LOG *lp; REGINFO *infop; REP *rep; @@ -2434,7 +2671,6 @@ __rep_check_missing(env, gen, master_perm_lsn) infop = env->reginfo; has_log_gap = has_page_gap = ret = 0; - ENV_ENTER(env, ip); MUTEX_LOCK(env, rep->mtx_clientdb); REP_SYSTEM_LOCK(env); /* @@ -2518,8 +2754,7 @@ __rep_check_missing(env, gen, master_perm_lsn) rep->msg_th--; REP_SYSTEM_UNLOCK(env); -out: ENV_LEAVE(env, ip); - return (ret); +out: return (ret); } static int |