diff options
Diffstat (limited to 'src/rep/rep_log.c')
-rw-r--r-- | src/rep/rep_log.c | 1060 |
1 files changed, 1060 insertions, 0 deletions
diff --git a/src/rep/rep_log.c b/src/rep/rep_log.c new file mode 100644 index 00000000..42300685 --- /dev/null +++ b/src/rep/rep_log.c @@ -0,0 +1,1060 @@ +/*- + * See the file LICENSE for redistribution information. + * + * Copyright (c) 2004, 2012 Oracle and/or its affiliates. All rights reserved. + * + * $Id$ + */ + +#include "db_config.h" + +#include "db_int.h" +#include "dbinc/log.h" + +static int __rep_chk_newfile __P((ENV *, DB_LOGC *, REP *, + __rep_control_args *, int)); +static int __rep_log_split __P((ENV *, DB_THREAD_INFO *, + __rep_control_args *, DBT *, DB_LSN *, DB_LSN *)); + +/* + * __rep_allreq -- + * Handle a REP_ALL_REQ message. + * + * PUBLIC: int __rep_allreq __P((ENV *, __rep_control_args *, int)); + */ +int +__rep_allreq(env, rp, eid) + ENV *env; + __rep_control_args *rp; + int eid; +{ + DBT data_dbt, newfiledbt; + DB_LOGC *logc; + DB_LSN log_end, oldfilelsn; + DB_REP *db_rep; + REP *rep; + REP_BULK bulk; + REP_THROTTLE repth; + __rep_newfile_args nf_args; + uintptr_t bulkoff; + u_int32_t bulkflags, end_flag, flags, use_bulk; + int arch_flag, ret, t_ret; + u_int8_t buf[__REP_NEWFILE_SIZE]; + size_t len; + + ret = 0; + db_rep = env->rep_handle; + rep = db_rep->region; + end_flag = 0; + arch_flag = 0; + + if ((ret = __log_cursor(env, &logc)) != 0) + return (ret); + memset(&data_dbt, 0, sizeof(data_dbt)); + /* + * If we're doing bulk transfer, allocate a bulk buffer to put our + * log records in. We still need to initialize the throttle info + * because if we encounter a log record larger than our entire bulk + * buffer, we need to send it as a singleton and also we want to + * support throttling with bulk. + * + * Use a local var so we don't need to worry if someone else turns + * on/off bulk in the middle of our call. + */ + use_bulk = FLD_ISSET(rep->config, REP_C_BULK); + bulk.addr = NULL; + if (use_bulk && (ret = __rep_bulk_alloc(env, &bulk, eid, + &bulkoff, &bulkflags, REP_BULK_LOG)) != 0) + goto err; + memset(&repth, 0, sizeof(repth)); + REP_SYSTEM_LOCK(env); + if ((ret = __rep_lockout_archive(env, rep)) != 0) { + REP_SYSTEM_UNLOCK(env); + goto err; + } + arch_flag = 1; + repth.gbytes = rep->gbytes; + repth.bytes = rep->bytes; + oldfilelsn = repth.lsn = rp->lsn; + repth.type = REP_LOG; + repth.data_dbt = &data_dbt; + REP_SYSTEM_UNLOCK(env); + + /* + * Get the LSN of the end of the log, so that in our reading loop + * (below), we can recognize when we get there, and set the + * REPCTL_LOG_END flag. + */ + if ((ret = __logc_get(logc, &log_end, &data_dbt, DB_LAST)) != 0) { + if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER)) + ret = 0; + goto err; + } + + flags = IS_ZERO_LSN(rp->lsn) || + IS_INIT_LSN(rp->lsn) ? DB_FIRST : DB_SET; + /* + * We get the first item so that a client servicing requests + * can distinguish between not having the records and reaching + * the end of its log. Return the DB_NOTFOUND if the client + * cannot get the record. Return 0 if we finish the loop and + * sent all that we have. + */ + ret = __logc_get(logc, &repth.lsn, &data_dbt, flags); + /* + * If the client is asking for all records + * because it doesn't have any, and our first + * record is not in the first log file, then + * the client is outdated and needs to get a + * VERIFY_FAIL. + */ + if (ret == 0 && repth.lsn.file != 1 && flags == DB_FIRST) { + if (F_ISSET(rep, REP_F_CLIENT)) + ret = DB_NOTFOUND; + else + (void)__rep_send_message(env, eid, + REP_VERIFY_FAIL, &repth.lsn, NULL, 0, 0); + goto err; + } + /* + * If we got DB_NOTFOUND it could be because the LSN we were + * given is at the end of the log file and we need to switch + * log files. Reinitialize and get the current record when we return. + */ + if (ret == DB_NOTFOUND) { + ret = __rep_chk_newfile(env, logc, rep, rp, eid); + /* + * If we still get DB_NOTFOUND the client gave us a + * bad or unknown LSN. Ignore it if we're the master. + * Any other error is returned. + */ + if (ret == 0) + ret = __logc_get(logc, &repth.lsn, + &data_dbt, DB_CURRENT); + if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER)) { + ret = 0; + goto err; + } + if (ret != 0) + goto err; + } + + /* + * For singleton log records, we break when we get a REP_LOG_MORE. + * Or if we're not using throttling, or we are using bulk, we stop + * when we reach the end (i.e. ret != 0). + */ + for (end_flag = 0; + ret == 0 && repth.type != REP_LOG_MORE && end_flag == 0; + ret = __logc_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) { + /* + * If we just changed log files, we need to send the + * version of this log file to the client. + */ + if (repth.lsn.file != oldfilelsn.file) { + if ((ret = __logc_version(logc, &nf_args.version)) != 0) + break; + memset(&newfiledbt, 0, sizeof(newfiledbt)); + if (rep->version < DB_REPVERSION_47) + DB_INIT_DBT(newfiledbt, &nf_args.version, + sizeof(nf_args.version)); + else { + if ((ret = __rep_newfile_marshal(env, &nf_args, + buf, __REP_NEWFILE_SIZE, &len)) != 0) + goto err; + DB_INIT_DBT(newfiledbt, buf, len); + } + (void)__rep_send_message(env, + eid, REP_NEWFILE, &oldfilelsn, &newfiledbt, + REPCTL_RESEND, 0); + } + + /* + * Mark the end of the ALL_REQ response to show that the + * receiving client should now be "caught up" with the + * replication group. If we're the master, then our log end is + * certainly authoritative. If we're another client, only if we + * ourselves have reached STARTUPDONE. + */ + end_flag = (LOG_COMPARE(&repth.lsn, &log_end) >= 0 && + (F_ISSET(rep, REP_F_MASTER) || + rep->stat.st_startup_complete)) ? + REPCTL_LOG_END : 0; + /* + * If we are configured for bulk, try to send this as a bulk + * request. If not configured, or it is too big for bulk + * then just send normally. + */ + if (use_bulk) + ret = __rep_bulk_message(env, &bulk, &repth, + &repth.lsn, &data_dbt, (REPCTL_RESEND | end_flag)); + if (!use_bulk || ret == DB_REP_BULKOVF) + ret = __rep_send_throttle(env, + eid, &repth, 0, end_flag); + if (ret != 0) + break; + /* + * If we are about to change files, then we'll need the + * last LSN in the previous file. Save it here. + */ + oldfilelsn = repth.lsn; + oldfilelsn.offset += logc->len; + } + + if (ret == DB_NOTFOUND || ret == DB_REP_UNAVAIL) + ret = 0; + /* + * We're done, force out whatever remains in the bulk buffer and + * free it. + */ +err: + /* + * We could have raced an unlink from an earlier log_archive + * and the user is removing the files themselves, now. If + * we get an error indicating the log file might no longer + * exist, ignore it. + */ + if (ret == ENOENT) + ret = 0; + if (bulk.addr != NULL && (t_ret = __rep_bulk_free(env, &bulk, + (REPCTL_RESEND | end_flag))) != 0 && ret == 0 && + t_ret != DB_REP_UNAVAIL) + ret = t_ret; + if (arch_flag) { + REP_SYSTEM_LOCK(env); + FLD_CLR(rep->lockout_flags, REP_LOCKOUT_ARCHIVE); + REP_SYSTEM_UNLOCK(env); + } + if ((t_ret = __logc_close(logc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* + * __rep_log -- + * Handle a REP_LOG/REP_LOG_MORE message. + * + * PUBLIC: int __rep_log __P((ENV *, DB_THREAD_INFO *, + * PUBLIC: __rep_control_args *, DBT *, int, time_t, DB_LSN *)); + */ +int +__rep_log(env, ip, rp, rec, eid, savetime, ret_lsnp) + ENV *env; + DB_THREAD_INFO *ip; + __rep_control_args *rp; + DBT *rec; + int eid; + time_t savetime; + DB_LSN *ret_lsnp; +{ + DB_LOG *dblp; + DB_LSN last_lsn, lsn; + DB_REP *db_rep; + LOG *lp; + REP *rep; + int is_dup, master, ret; + u_int32_t gapflags; + + is_dup = ret = 0; + db_rep = env->rep_handle; + rep = db_rep->region; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + + ret = __rep_apply(env, ip, rp, rec, ret_lsnp, &is_dup, &last_lsn); + switch (ret) { + /* + * We're in an internal backup and we've gotten + * all the log we need to run recovery. Do so now. + */ + case DB_REP_LOGREADY: + if ((ret = + __rep_logready(env, rep, savetime, &last_lsn)) != 0) + goto out; + break; + /* + * If we get any of the "normal" returns, we only process + * LOG_MORE if this is not a duplicate record. If the + * record is a duplicate we don't want to handle LOG_MORE + * and request a multiple data stream (or trigger internal + * initialization) since this could be a very old record + * that no longer exists on the master. + */ + case DB_REP_ISPERM: + case DB_REP_NOTPERM: + case 0: + if (is_dup) + goto out; + else + break; + /* + * Any other return (errors), we're done. + */ + default: + goto out; + } + if (rp->rectype == REP_LOG_MORE) { + master = rep->master_id; + + /* + * Keep the cycle from stalling: In case we got the LOG_MORE out + * of order, before some preceding log records, we want to make + * sure our follow-up request resumes from where the LOG_MORE + * said it should. (If the preceding log records never arrive, + * normal gap processing should take care of asking for them.) + * But if we already have this record and/or more, we need to + * ask to resume from what we need. The upshot is we need the + * max of lp->lsn and the lsn from the message. + */ + MUTEX_LOCK(env, rep->mtx_clientdb); + lsn = lp->ready_lsn; + if (LOG_COMPARE(&rp->lsn, &lsn) > 0) + lsn = rp->lsn; + + /* + * If the master_id is invalid, this means that since + * the last record was sent, somebody declared an + * election and we may not have a master to request + * things of. + * + * This is not an error; when we find a new master, + * we'll re-negotiate where the end of the log is and + * try to bring ourselves up to date again anyway. + */ + if (master == DB_EID_INVALID) { + ret = 0; + MUTEX_UNLOCK(env, rep->mtx_clientdb); + goto out; + } + /* + * If we're waiting for records, set the wait_ts + * high so that we avoid re-requesting too soon and + * end up with multiple data streams. + */ + if (IS_ZERO_LSN(lp->waiting_lsn)) + lp->wait_ts = rep->max_gap; + /* + * If preceding log records were from the master, send the + * request for further log records to the master instead of + * allowing it to default to ANYWHERE. + */ + gapflags = REP_GAP_FORCE; + if (master == eid) + gapflags = gapflags | REP_GAP_REREQUEST; + ret = __rep_loggap_req(env, rep, &lsn, gapflags); + MUTEX_UNLOCK(env, rep->mtx_clientdb); + } +out: + return (ret); +} + +/* + * __rep_bulk_log -- + * Handle a REP_BULK_LOG message. + * + * PUBLIC: int __rep_bulk_log __P((ENV *, DB_THREAD_INFO *, + * PUBLIC: __rep_control_args *, DBT *, time_t, DB_LSN *)); + */ +int +__rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp) + ENV *env; + DB_THREAD_INFO *ip; + __rep_control_args *rp; + DBT *rec; + time_t savetime; + DB_LSN *ret_lsnp; +{ + DB_LSN last_lsn; + DB_REP *db_rep; + REP *rep; + int ret; + + db_rep = env->rep_handle; + rep = db_rep->region; + + ret = __rep_log_split(env, ip, rp, rec, ret_lsnp, &last_lsn); + switch (ret) { + /* + * We're in an internal backup and we've gotten + * all the log we need to run recovery. Do so now. + */ + case DB_REP_LOGREADY: + ret = __rep_logready(env, rep, savetime, &last_lsn); + break; + /* + * Any other return (errors), we're done. + */ + default: + break; + } + return (ret); +} + +/* + * __rep_log_split -- + * - Split a log buffer into individual records. + * + * This is used by a client to process a bulk log message from the + * master and convert it into individual __rep_apply requests. + */ +static int +__rep_log_split(env, ip, rp, rec, ret_lsnp, last_lsnp) + ENV *env; + DB_THREAD_INFO *ip; + __rep_control_args *rp; + DBT *rec; + DB_LSN *ret_lsnp; + DB_LSN *last_lsnp; +{ + DBT logrec; + DB_LSN next_new_lsn, save_lsn, tmp_lsn; + __rep_control_args tmprp; + __rep_bulk_args b_args; + int is_dup, ret, save_ret; + u_int32_t save_flags; + u_int8_t *p, *ep; + + memset(&logrec, 0, sizeof(logrec)); + ZERO_LSN(next_new_lsn); + ZERO_LSN(save_lsn); + ZERO_LSN(tmp_lsn); + /* + * We're going to be modifying the rp LSN contents so make + * our own private copy to play with. + */ + memcpy(&tmprp, rp, sizeof(tmprp)); + /* + * We send the bulk buffer on a PERM record, so often we will have + * DB_LOG_PERM set. However, we only want to mark the last LSN + * we have as a PERM record. So clear it here, and when we're on + * the last record below, set it. The same applies if the sender + * set REPCTL_LOG_END on this message. We want the end of the + * bulk buffer to be marked as the end. + */ + save_flags = F_ISSET(rp, REPCTL_LOG_END | REPCTL_PERM); + F_CLR(&tmprp, REPCTL_LOG_END | REPCTL_PERM); + is_dup = ret = save_ret = 0; + for (ep = (u_int8_t *)rec->data + rec->size, p = (u_int8_t *)rec->data; + p < ep; ) { + /* + * First thing in the buffer is the length. Then the LSN + * of this record, then the record itself. + */ + if (rp->rep_version < DB_REPVERSION_47) { + memcpy(&b_args.len, p, sizeof(b_args.len)); + p += sizeof(b_args.len); + memcpy(&tmprp.lsn, p, sizeof(DB_LSN)); + p += sizeof(DB_LSN); + logrec.data = p; + logrec.size = b_args.len; + p += b_args.len; + } else { + if ((ret = __rep_bulk_unmarshal(env, + &b_args, p, rec->size, &p)) != 0) + return (ret); + tmprp.lsn = b_args.lsn; + logrec.data = b_args.bulkdata.data; + logrec.size = b_args.len; + } + VPRINT(env, (env, DB_VERB_REP_MISC, + "log_rep_split: Processing LSN [%lu][%lu]", + (u_long)tmprp.lsn.file, (u_long)tmprp.lsn.offset)); + VPRINT(env, (env, DB_VERB_REP_MISC, + "log_rep_split: p %#lx ep %#lx logrec data %#lx, size %lu (%#lx)", + P_TO_ULONG(p), P_TO_ULONG(ep), P_TO_ULONG(logrec.data), + (u_long)logrec.size, (u_long)logrec.size)); + if (p >= ep && save_flags) + F_SET(&tmprp, save_flags); + /* + * A previous call to __rep_apply indicated an earlier + * record is a dup and the next_new_lsn we are waiting for. + * Skip log records until we catch up with next_new_lsn. + */ + if (is_dup && LOG_COMPARE(&tmprp.lsn, &next_new_lsn) < 0) { + VPRINT(env, (env, DB_VERB_REP_MISC, + "log_split: Skip dup LSN [%lu][%lu]", + (u_long)tmprp.lsn.file, (u_long)tmprp.lsn.offset)); + continue; + } + is_dup = 0; + ret = __rep_apply(env, ip, + &tmprp, &logrec, &tmp_lsn, &is_dup, last_lsnp); + VPRINT(env, (env, DB_VERB_REP_MISC, + "log_split: rep_apply ret %d, dup %d, tmp_lsn [%lu][%lu]", + ret, is_dup, (u_long)tmp_lsn.file, (u_long)tmp_lsn.offset)); + if (is_dup) + next_new_lsn = tmp_lsn; + switch (ret) { + /* + * If we received the pieces we need for running recovery, + * short-circuit because recovery will truncate the log to + * the LSN we want anyway. + */ + case DB_REP_LOGREADY: + goto out; + /* + * If we just handled a special record, retain that information. + */ + case DB_REP_ISPERM: + case DB_REP_NOTPERM: + save_ret = ret; + save_lsn = tmp_lsn; + ret = 0; + break; + /* + * Normal processing, do nothing, just continue. + */ + case 0: + break; + /* + * If we get an error, then stop immediately. + */ + default: + goto out; + } + } +out: + /* + * If we finish processing successfully, set our return values + * based on what we saw. + */ + if (ret == 0) { + ret = save_ret; + *ret_lsnp = save_lsn; + } + return (ret); +} + +/* + * __rep_log_req -- + * Handle a REP_LOG_REQ message. + * + * PUBLIC: int __rep_logreq __P((ENV *, __rep_control_args *, DBT *, int)); + */ +int +__rep_logreq(env, rp, rec, eid) + ENV *env; + __rep_control_args *rp; + DBT *rec; + int eid; +{ + DBT data_dbt, newfiledbt; + DB_LOGC *logc; + DB_LSN firstlsn, lsn, oldfilelsn; + DB_REP *db_rep; + REP *rep; + REP_BULK bulk; + REP_THROTTLE repth; + __rep_logreq_args lr_args; + __rep_newfile_args nf_args; + uintptr_t bulkoff; + u_int32_t bulkflags, use_bulk; + int count, ret, t_ret; + u_int8_t buf[__REP_NEWFILE_SIZE]; + size_t len; + + ret = 0; + db_rep = env->rep_handle; + rep = db_rep->region; + + /* COMPQUIET_LSN is what this is... */ + ZERO_LSN(lr_args.endlsn); + + if (rec != NULL && rec->size != 0) { + if (rp->rep_version < DB_REPVERSION_47) + lr_args.endlsn = *(DB_LSN *)rec->data; + else if ((ret = __rep_logreq_unmarshal(env, &lr_args, + rec->data, rec->size, NULL)) != 0) + return (ret); + RPRINT(env, (env, DB_VERB_REP_MISC, + "[%lu][%lu]: LOG_REQ max lsn: [%lu][%lu]", + (u_long) rp->lsn.file, (u_long)rp->lsn.offset, + (u_long)lr_args.endlsn.file, + (u_long)lr_args.endlsn.offset)); + } + /* + * There are several different cases here. + * 1. We asked logc_get for a particular LSN and got it. + * 2. We asked logc_get for an LSN and it's not found because it is + * beyond the end of a log file and we need a NEWFILE msg. + * and then the record that was requested. + * 3. We asked logc_get for an LSN and it is already archived. + * 4. We asked logc_get for an LSN and it simply doesn't exist, but + * doesn't meet any of those other criteria, in which case + * it's an error (that should never happen on a master). + * + * If we have a valid LSN and the request has a data_dbt with + * it, the sender is asking for a chunk of log records. + * Then we need to send all records up to the LSN in the data dbt. + */ + memset(&data_dbt, 0, sizeof(data_dbt)); + oldfilelsn = lsn = rp->lsn; + if ((ret = __log_cursor(env, &logc)) != 0) + return (ret); + REP_SYSTEM_LOCK(env); + if ((ret = __rep_lockout_archive(env, rep)) != 0) { + REP_SYSTEM_UNLOCK(env); + goto err; + } + REP_SYSTEM_UNLOCK(env); + if ((ret = __logc_get(logc, &lsn, &data_dbt, DB_SET)) == 0) { + /* Case 1 */ + (void)__rep_send_message(env, + eid, REP_LOG, &lsn, &data_dbt, REPCTL_RESEND, 0); + oldfilelsn.offset += logc->len; + } else if (ret == DB_NOTFOUND) { + /* + * If logc_get races with log_archive or the user removing + * files from an earlier call to log_archive, it might return + * DB_NOTFOUND. We expect there to be some log record + * that is the first one. Loop until we either get + * a log record or some error. Since we only expect + * to get this racing log file removal, bound it to a few + * tries. + */ + count = 0; + do { + ret = __logc_get(logc, &firstlsn, &data_dbt, DB_FIRST); + /* + * If we've raced this many tries and we're still + * getting DB_NOTFOUND, then pause a bit to disrupt + * the timing cycle that we appear to be in. + */ + if (count > 5) + __os_yield(env, 0, 50000); + count++; + } while (ret == DB_NOTFOUND && count < 10); + if (ret != 0) { + /* + * If we're master we don't want to return DB_NOTFOUND. + * We'll just ignore the error and this message. + * It will get rerequested if needed. + */ + if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER)) + ret = 0; + goto err; + } + if (LOG_COMPARE(&firstlsn, &rp->lsn) > 0) { + /* Case 3 */ + if (F_ISSET(rep, REP_F_CLIENT)) { + ret = DB_NOTFOUND; + goto err; + } + (void)__rep_send_message(env, eid, + REP_VERIFY_FAIL, &rp->lsn, NULL, 0, 0); + ret = 0; + goto err; + } + ret = __rep_chk_newfile(env, logc, rep, rp, eid); + if (ret == DB_NOTFOUND) { + /* Case 4 */ + /* + * If we still get DB_NOTFOUND the client gave us an + * unknown LSN, perhaps at the end of the log. Ignore + * it if we're the master. Return DB_NOTFOUND if + * we are the client. + */ + if (F_ISSET(rep, REP_F_MASTER)) { + __db_errx(env, DB_STR_A("3501", + "Request for LSN [%lu][%lu] not found", + "%lu %lu"), (u_long)rp->lsn.file, + (u_long)rp->lsn.offset); + ret = 0; + goto err; + } else + ret = DB_NOTFOUND; + } + } + + if (ret != 0) + goto err; + + /* + * If the user requested a gap, send the whole thing, while observing + * the limits from rep_set_limit. + * + * If we're doing bulk transfer, allocate a bulk buffer to put our + * log records in. We still need to initialize the throttle info + * because if we encounter a log record larger than our entire bulk + * buffer, we need to send it as a singleton. + * + * Use a local var so we don't need to worry if someone else turns + * on/off bulk in the middle of our call. + */ + use_bulk = FLD_ISSET(rep->config, REP_C_BULK); + if (use_bulk && (ret = __rep_bulk_alloc(env, &bulk, eid, + &bulkoff, &bulkflags, REP_BULK_LOG)) != 0) + goto err; + memset(&repth, 0, sizeof(repth)); + REP_SYSTEM_LOCK(env); + repth.gbytes = rep->gbytes; + repth.bytes = rep->bytes; + repth.type = REP_LOG; + repth.data_dbt = &data_dbt; + REP_SYSTEM_UNLOCK(env); + while (ret == 0 && rec != NULL && rec->size != 0 && + repth.type == REP_LOG) { + if ((ret = + __logc_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) != 0) { + /* + * If we're a client and we only have part of the gap, + * return DB_NOTFOUND so that we send a REREQUEST + * back to the requester and it can ask for more. + */ + if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER)) + ret = 0; + break; + } + if (LOG_COMPARE(&repth.lsn, &lr_args.endlsn) >= 0) + break; + if (repth.lsn.file != oldfilelsn.file) { + if ((ret = __logc_version(logc, &nf_args.version)) != 0) + break; + memset(&newfiledbt, 0, sizeof(newfiledbt)); + if (rep->version < DB_REPVERSION_47) + DB_INIT_DBT(newfiledbt, &nf_args.version, + sizeof(nf_args.version)); + else { + if ((ret = __rep_newfile_marshal(env, &nf_args, + buf, __REP_NEWFILE_SIZE, &len)) != 0) + goto err; + DB_INIT_DBT(newfiledbt, buf, len); + } + (void)__rep_send_message(env, + eid, REP_NEWFILE, &oldfilelsn, &newfiledbt, + REPCTL_RESEND, 0); + } + /* + * If we are configured for bulk, try to send this as a bulk + * request. If not configured, or it is too big for bulk + * then just send normally. + */ + if (use_bulk) + ret = __rep_bulk_message(env, &bulk, &repth, + &repth.lsn, &data_dbt, REPCTL_RESEND); + if (!use_bulk || ret == DB_REP_BULKOVF) + ret = __rep_send_throttle(env, eid, &repth, 0, 0); + if (ret != 0) { + /* Ignore send failure, except to break the loop. */ + if (ret == DB_REP_UNAVAIL) + ret = 0; + break; + } + /* + * If we are about to change files, then we'll need the + * last LSN in the previous file. Save it here. + */ + oldfilelsn = repth.lsn; + oldfilelsn.offset += logc->len; + } + + /* + * We're done, force out whatever remains in the bulk buffer and + * free it. + */ + if (use_bulk && (t_ret = __rep_bulk_free(env, &bulk, + REPCTL_RESEND)) != 0 && ret == 0 && + t_ret != DB_REP_UNAVAIL) + ret = t_ret; +err: + /* + * We could have raced an unlink from an earlier log_archive + * and the user is removing the files themselves, now. If + * we get an error indicating the log file might no longer + * exist, ignore it. + */ + if (ret == ENOENT) + ret = 0; + REP_SYSTEM_LOCK(env); + FLD_CLR(rep->lockout_flags, REP_LOCKOUT_ARCHIVE); + REP_SYSTEM_UNLOCK(env); + if ((t_ret = __logc_close(logc)) != 0 && ret == 0) + ret = t_ret; + return (ret); +} + +/* + * __rep_loggap_req - + * Request a log gap. Assumes the caller holds the REP->mtx_clientdb. + * + * lsnp is the current LSN we're handling. It is used to help decide + * if we ask for a gap or singleton. + * gapflags are flags that may override the algorithm or control the + * processing in some way. + * + * PUBLIC: int __rep_loggap_req __P((ENV *, REP *, DB_LSN *, u_int32_t)); + */ +int +__rep_loggap_req(env, rep, lsnp, gapflags) + ENV *env; + REP *rep; + DB_LSN *lsnp; + u_int32_t gapflags; +{ + DBT max_lsn_dbt, *max_lsn_dbtp; + DB_LOG *dblp; + DB_LSN next_lsn; + LOG *lp; + __rep_logreq_args lr_args; + size_t len; + u_int32_t ctlflags, flags, type; + int master, ret; + u_int8_t buf[__REP_LOGREQ_SIZE]; + + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + if (FLD_ISSET(gapflags, REP_GAP_FORCE)) + next_lsn = *lsnp; + else + next_lsn = lp->ready_lsn; + ctlflags = flags = 0; + type = REP_LOG_REQ; + ret = 0; + + /* + * Check if we need to ask for the gap. + * We ask for the gap if: + * We are forced to with gapflags. + * If max_wait_lsn is ZERO_LSN - we've never asked for + * records before. + * If we asked for a single record and received it. + * + * If we want a gap, but don't have an ending LSN (waiting_lsn) + * send an ALL_REQ. This is primarily used by REP_REREQUEST when + * an ALL_REQ was not able to be fulfilled by another client. + */ + if (FLD_ISSET(gapflags, (REP_GAP_FORCE | REP_GAP_REREQUEST)) || + IS_ZERO_LSN(lp->max_wait_lsn) || + (lsnp != NULL && LOG_COMPARE(lsnp, &lp->max_wait_lsn) == 0)) { + lp->max_wait_lsn = lp->waiting_lsn; + /* + * In SYNC_LOG, make sure max_wait_lsn is set to avoid sending + * an ALL_REQ that could create an unnecessary dual data stream. + */ + if (rep->sync_state == SYNC_LOG && + IS_ZERO_LSN(lp->max_wait_lsn)) + lp->max_wait_lsn = rep->last_lsn; + /* + * If we are forcing a gap, we need to send a max_wait_lsn + * that may be beyond the current gap/waiting_lsn (but + * it may not be). If we cannot determine any future + * waiting LSN, then it should be zero. If we're in + * internal init, it should be our ending LSN. + */ + if (FLD_ISSET(gapflags, REP_GAP_FORCE)) { + if (LOG_COMPARE(&lp->max_wait_lsn, lsnp) <= 0) { + if (rep->sync_state == SYNC_LOG) { + DB_ASSERT(env, LOG_COMPARE(lsnp, + &rep->last_lsn) <= 0); + lp->max_wait_lsn = rep->last_lsn; + } else + ZERO_LSN(lp->max_wait_lsn); + } + } + if (IS_ZERO_LSN(lp->max_wait_lsn)) + type = REP_ALL_REQ; + memset(&max_lsn_dbt, 0, sizeof(max_lsn_dbt)); + lr_args.endlsn = lp->max_wait_lsn; + if (rep->version < DB_REPVERSION_47) + DB_INIT_DBT(max_lsn_dbt, &lp->max_wait_lsn, + sizeof(DB_LSN)); + else { + if ((ret = __rep_logreq_marshal(env, &lr_args, buf, + __REP_LOGREQ_SIZE, &len)) != 0) + goto err; + DB_INIT_DBT(max_lsn_dbt, buf, len); + } + max_lsn_dbtp = &max_lsn_dbt; + /* + * Gap requests are "new" and can go anywhere, unless + * this is already a re-request. + */ + if (FLD_ISSET(gapflags, REP_GAP_REREQUEST)) + flags = DB_REP_REREQUEST; + else + flags = DB_REP_ANYWHERE; + } else { + max_lsn_dbtp = NULL; + lp->max_wait_lsn = next_lsn; + /* + * If we're dropping to singletons, this is a re-request. + */ + flags = DB_REP_REREQUEST; + } + if ((master = rep->master_id) != DB_EID_INVALID) { + STAT_INC(env, + rep, log_request, rep->stat.st_log_requested, master); + if (rep->sync_state == SYNC_LOG) + ctlflags = REPCTL_INIT; + (void)__rep_send_message(env, master, + type, &next_lsn, max_lsn_dbtp, ctlflags, flags); + } else + (void)__rep_send_message(env, DB_EID_BROADCAST, + REP_MASTER_REQ, NULL, NULL, 0, 0); +err: + return (ret); +} + +/* + * __rep_logready - + * Handle getting back REP_LOGREADY. Any call to __rep_apply + * can return it. + * + * PUBLIC: int __rep_logready __P((ENV *, REP *, time_t, DB_LSN *)); + */ +int +__rep_logready(env, rep, savetime, last_lsnp) + ENV *env; + REP *rep; + time_t savetime; + DB_LSN *last_lsnp; +{ + REGENV *renv; + REGINFO *infop; + int ret; + + infop = env->reginfo; + renv = infop->primary; + if ((ret = __log_flush(env, NULL)) != 0) + goto err; + if ((ret = __rep_verify_match(env, last_lsnp, savetime)) != 0) + goto err; + + REP_SYSTEM_LOCK(env); + ZERO_LSN(rep->first_lsn); + + if (rep->originfo_off != INVALID_ROFF) { + MUTEX_LOCK(env, renv->mtx_regenv); + __env_alloc_free(infop, R_ADDR(infop, rep->originfo_off)); + MUTEX_UNLOCK(env, renv->mtx_regenv); + rep->originfo_off = INVALID_ROFF; + } + + rep->sync_state = SYNC_OFF; + F_SET(rep, REP_F_NIMDBS_LOADED); + ret = __rep_notify_threads(env, AWAIT_NIMDB); + REP_SYSTEM_UNLOCK(env); + if (ret != 0) + goto err; + + return (0); + +err: + DB_ASSERT(env, ret != DB_REP_WOULDROLLBACK); + __db_errx(env, DB_STR("3502", + "Client initialization failed. Need to manually restore client")); + return (__env_panic(env, ret)); +} + +/* + * __rep_chk_newfile -- + * Determine if getting DB_NOTFOUND is because we're at the + * end of a log file and need to send a NEWFILE message. + * + * This function handles these cases: + * [Case 1 was that we found the record we were looking for - it + * is already handled by the caller.] + * 2. We asked logc_get for an LSN and it's not found because it is + * beyond the end of a log file and we need a NEWFILE msg. + * 3. We asked logc_get for an LSN and it simply doesn't exist, but + * doesn't meet any of those other criteria, in which case + * we return DB_NOTFOUND and the caller decides if it's an error. + * + * This function returns 0 if we had to send a message and the bad + * LSN is dealt with and DB_NOTFOUND if this really is an unknown LSN + * (on a client) and errors if it isn't found on the master. + */ +static int +__rep_chk_newfile(env, logc, rep, rp, eid) + ENV *env; + DB_LOGC *logc; + REP *rep; + __rep_control_args *rp; + int eid; +{ + DBT data_dbt, newfiledbt; + DB_LOG *dblp; + DB_LSN endlsn; + LOG *lp; + __rep_newfile_args nf_args; + int ret; + u_int8_t buf[__REP_NEWFILE_SIZE]; + size_t len; + + ret = 0; + dblp = env->lg_handle; + lp = dblp->reginfo.primary; + memset(&data_dbt, 0, sizeof(data_dbt)); + LOG_SYSTEM_LOCK(env); + endlsn = lp->lsn; + LOG_SYSTEM_UNLOCK(env); + if (endlsn.file > rp->lsn.file) { + /* + * Case 2: + * Need to find the LSN of the last record in + * file lsn.file so that we can send it with + * the NEWFILE call. In order to do that, we + * need to try to get {lsn.file + 1, 0} and + * then backup. + */ + endlsn.file = rp->lsn.file + 1; + endlsn.offset = 0; + if ((ret = __logc_get(logc, + &endlsn, &data_dbt, DB_SET)) != 0 || + (ret = __logc_get(logc, + &endlsn, &data_dbt, DB_PREV)) != 0) { + RPRINT(env, (env, DB_VERB_REP_MISC, + "Unable to get prev of [%lu][%lu]", + (u_long)rp->lsn.file, + (u_long)rp->lsn.offset)); + /* + * We want to push the error back + * to the client so that the client + * does an internal backup. The + * client asked for a log record + * we no longer have and it is + * outdated. + * XXX - This could be optimized by + * having the master perform and + * send a REP_UPDATE message. We + * currently want the client to set + * up its 'update' state prior to + * requesting REP_UPDATE_REQ. + * + * If we're a client servicing a request + * just return DB_NOTFOUND. + */ + if (F_ISSET(rep, REP_F_MASTER)) { + ret = 0; + (void)__rep_send_message(env, eid, + REP_VERIFY_FAIL, &rp->lsn, + NULL, 0, 0); + } else + ret = DB_NOTFOUND; + } else { + endlsn.offset += logc->len; + if ((ret = __logc_version(logc, + &nf_args.version)) == 0) { + memset(&newfiledbt, 0, + sizeof(newfiledbt)); + if (rep->version < DB_REPVERSION_47) + DB_INIT_DBT(newfiledbt, + &nf_args.version, + sizeof(nf_args.version)); + else { + if ((ret = __rep_newfile_marshal(env, + &nf_args, buf, __REP_NEWFILE_SIZE, + &len)) != 0) + return (ret); + DB_INIT_DBT(newfiledbt, buf, len); + } + (void)__rep_send_message(env, eid, + REP_NEWFILE, &endlsn, + &newfiledbt, REPCTL_RESEND, 0); + } + } + } else + ret = DB_NOTFOUND; + + return (ret); +} |