diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/minvalid.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/repl/minvalid.h | 53 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/optime.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/optime.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 31 |
8 files changed, 124 insertions, 42 deletions
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp index 7860d4ac988..bcdc5b5a404 100644 --- a/src/mongo/db/repl/minvalid.cpp +++ b/src/mongo/db/repl/minvalid.cpp @@ -46,9 +46,10 @@ namespace mongo { namespace repl { namespace { -const char* initialSyncFlagString = "doingInitialSync"; +const char initialSyncFlagString[] = "doingInitialSync"; const BSONObj initialSyncFlag(BSON(initialSyncFlagString << true)); -const char* minvalidNS = "local.replset.minvalid"; +const char minvalidNS[] = "local.replset.minvalid"; +const char beginFieldName[] = "begin"; } // namespace // Writes @@ -60,6 +61,7 @@ void clearInitialSyncFlag(OperationContext* txn) { Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag)); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS); + LOG(3) << "clearing initial sync flag"; } void setInitialSyncFlag(OperationContext* txn) { @@ -69,18 +71,38 @@ void setInitialSyncFlag(OperationContext* txn) { Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag)); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS); + LOG(3) << "setting initial sync flag"; } -void setMinValid(OperationContext* ctx, const OpTime& opTime) { +void setMinValid(OperationContext* ctx, const OpTime& endOpTime) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(ctx, MODE_IX); Lock::DBLock dblk(ctx->lockState(), "local", MODE_X); Helpers::putSingleton( ctx, minvalidNS, - BSON("$set" << BSON("ts" << opTime.getTimestamp() << "t" << opTime.getTerm()))); + BSON("$set" << BSON("ts" << endOpTime.getTimestamp() << "t" << endOpTime.getTerm()) + << "$unset" << BSON(beginFieldName << 1))); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS); + LOG(3) << "setting minvalid: " << endOpTime.toString() << "(" << endOpTime.toBSON() << ")"; +} + +void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries) { + const OpTime& start(boundaries.start); + const OpTime& end(boundaries.end); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(ctx, MODE_IX); + Lock::DBLock dblk(ctx->lockState(), "local", MODE_X); + Helpers::putSingleton(ctx, + minvalidNS, + BSON("$set" << BSON("ts" << end.getTimestamp() << "t" << end.getTerm() + << beginFieldName << start.toBSON()))); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS); + LOG(3) << "setting minvalid: " << boundaries.start.toString() << "(" + << boundaries.start.toBSON() << ") -> " << boundaries.end.toString() << "(" + << boundaries.end.toBSON() << ")"; } // Reads @@ -94,8 +116,11 @@ bool getInitialSyncFlag() { bool found = Helpers::getSingleton(&txn, minvalidNS, mv); if (found) { - return mv[initialSyncFlagString].trueValue(); + const auto flag = mv[initialSyncFlagString].trueValue(); + LOG(3) << "return initial flag value of " << flag; + return flag; } + LOG(3) << "return initial flag value of false"; return false; } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(&txn, "getInitialSyncFlags", minvalidNS); @@ -103,7 +128,7 @@ bool getInitialSyncFlag() { MONGO_UNREACHABLE; } -OpTime getMinValid(OperationContext* txn) { +BatchBoundaries getMinValid(OperationContext* txn) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IS); Lock::DBLock dblk(txn->lockState(), "local", MODE_IS); @@ -111,9 +136,16 @@ OpTime getMinValid(OperationContext* txn) { BSONObj mv; bool found = Helpers::getSingleton(txn, minvalidNS, mv); if (found) { - return fassertStatusOK(28771, OpTime::parseFromOplogEntry(mv)); + auto status = OpTime::parseFromOplogEntry(mv.getObjectField(beginFieldName)); + OpTime start(status.isOK() ? status.getValue() : OpTime{}); + OpTime end(fassertStatusOK(28771, OpTime::parseFromOplogEntry(mv))); + LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> " + << end.toString() << "(" << end.toBSON() << ")"; + + return BatchBoundaries(start, end); } - return OpTime(); + LOG(3) << "returning empty minvalid"; + return BatchBoundaries{OpTime{}, OpTime{}}; } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS); } diff --git a/src/mongo/db/repl/minvalid.h b/src/mongo/db/repl/minvalid.h index 8d0ceb79613..482a7cdc72e 100644 --- a/src/mongo/db/repl/minvalid.h +++ b/src/mongo/db/repl/minvalid.h @@ -28,40 +28,67 @@ #pragma once +#include "mongo/db/repl/optime.h" + namespace mongo { class BSONObj; class OperationContext; namespace repl { -class OpTime; + +struct BatchBoundaries { + BatchBoundaries(const OpTime s, const OpTime e) : start(s), end(e) {} + const OpTime start; + const OpTime end; +}; /** - * Helper functions for maintaining local.replset.minvalid collection contents. + * Helper functions for maintaining a single document in the local.replset.minvalid collection. * * When a member reaches its minValid optime it is in a consistent state. Thus, minValid is - * set as the last step in initial sync. At the beginning of initial sync, _initialSyncFlag + * set as the last step in initial sync. At the beginning of initial sync, doingInitialSync * is appended onto minValid to indicate that initial sync was started but has not yet * completed. - * minValid is also used during "normal" sync: the last op in each batch is used to set - * minValid, to indicate that we are in a consistent state when the batch has been fully - * applied. + * + * The document is also updated during "normal" sync. The optime of the last op in each batch is + * used to set minValid, along with a "begin" field to demark the start and the fact that a batch + * is active. When the batch is done the "begin" field is removed to indicate that we are in a + * consistent state when the batch has been fully applied. + * + * Example of all fields: + * { _id:..., + * doingInitialSync: true // initial sync is active + * ts:..., t:... // end-OpTime + * begin: {ts:..., t:...} // a batch is currently being applied, and not consistent + * } */ /** - * The initial sync flag is used to durably record the state of an initial sync; its boolean - * value is true when an initial sync is in progress and hasn't yet completed. The flag - * is stored as part of the local.replset.minvalid collection. + * The initial sync flag is used to durably record that initial sync has not completed. */ void clearInitialSyncFlag(OperationContext* txn); void setInitialSyncFlag(OperationContext* txn); bool getInitialSyncFlag(); + +/** + * Returns the bounds of the current apply batch, if active. If start is null/missing, and + * end is equal to the last oplog entry then we are in a consistent state and ready for reads. + */ +BatchBoundaries getMinValid(OperationContext* txn); + /** * The minValid value is the earliest (minimum) Timestamp that must be applied in order to - * consider the dataset consistent. Do not allow client reads if our last applied operation is - * before the minValid time. + * consider the dataset consistent. + * + * This is called when a batch finishes. + */ +void setMinValid(OperationContext* ctx, const OpTime& endOpTime); + +/** + * The bounds indicate an apply is active and we are not in a consistent state to allow reads + * or transition from a non-visible state to primary/secondary. */ -void setMinValid(OperationContext* ctx, const OpTime& opTime); -OpTime getMinValid(OperationContext* txn); +void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries); } } diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index ecda101a91f..4ba200694a6 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -165,7 +165,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn, log() << "our last optime : " << lastOpTimeFetched; log() << "oldest available is " << oldestOpTimeSeen; log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; - setMinValid(txn, oldestOpTimeSeen); + setMinValid(txn, {lastOpTimeFetched, oldestOpTimeSeen}); bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING); if (!worked) { warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp index 92a3c5556ca..53474c9ba60 100644 --- a/src/mongo/db/repl/optime.cpp +++ b/src/mongo/db/repl/optime.cpp @@ -82,6 +82,13 @@ StatusWith<OpTime> OpTime::parseFromOplogEntry(const BSONObj& obj) { return OpTime(ts, term); } +BSONObj OpTime::toBSON() const { + BSONObjBuilder bldr; + bldr.append(kTimestampFieldName, _timestamp); + bldr.append(kTermFieldName, _term); + return bldr.obj(); +} + std::string OpTime::toString() const { std::stringstream ss; ss << "(term: " << _term << ", timestamp: " << _timestamp.toStringPretty() << ")"; diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h index 0af26bc8250..8e3e140db68 100644 --- a/src/mongo/db/repl/optime.h +++ b/src/mongo/db/repl/optime.h @@ -76,6 +76,7 @@ public: * subObjName : { ts: <timestamp>, t: <term> } */ void append(BSONObjBuilder* builder, const std::string& subObjName) const; + BSONObj toBSON() const; static StatusWith<OpTime> parseFromOplogEntry(const BSONObj& obj); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 3f0019f6462..38b122d745b 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -372,7 +372,7 @@ void syncFixUp(OperationContext* txn, // online until we get to that point in freshness. OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid)); log() << "minvalid=" << minValid; - setMinValid(txn, minValid); + setMinValid(txn, {OpTime{}, minValid}); // any full collection resyncs required? if (!fixUpInfo.collectionsToResyncData.empty() || @@ -476,7 +476,8 @@ void syncFixUp(OperationContext* txn, } else { OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid)); log() << "minvalid=" << minValid; - setMinValid(txn, minValid); + const OpTime start{fixUpInfo.commonPoint, OpTime::kUninitializedTerm}; + setMinValid(txn, {start, minValid}); } } catch (const DBException& e) { err = "can't get/set minvalid: "; @@ -812,13 +813,14 @@ Status _syncRollback(OperationContext* txn, auto res = syncRollBackLocalOperations( localOplog, rollbackSource.getOplog(), processOperationForFixUp); if (!res.isOK()) { - switch (res.getStatus().code()) { + const auto status = res.getStatus(); + switch (status.code()) { case ErrorCodes::OplogStartMissing: case ErrorCodes::UnrecoverableRollbackError: sleepSecondsFn(Seconds(1)); - return res.getStatus(); + return status; default: - throw RSFatalException(res.getStatus().toString()); + throw RSFatalException(status.toString()); } } else { how.commonPoint = res.getValue().first; @@ -887,13 +889,13 @@ Status syncRollback(OperationContext* txn, // check that we are at minvalid, otherwise we cannot rollback as we may be in an // inconsistent state { - OpTime minvalid = getMinValid(txn); - if (minvalid > lastOpTimeApplied) { + BatchBoundaries boundaries = getMinValid(txn); + if (!boundaries.start.isNull() || boundaries.end > lastOpTimeApplied) { severe() << "need to rollback, but in inconsistent state" << endl; return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << "need to rollback, but in inconsistent state. " - << "minvalid: " << minvalid.toString() - << " our last optime: " << lastOpTimeApplied.toString(), + << "minvalid: " << boundaries.end.toString() + << " > our last optime: " << lastOpTimeApplied.toString(), 18750); } } diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 31f0b59ce41..cb6dc66e39c 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -154,6 +154,7 @@ void RSRollbackTest::setUp() { setGlobalReplicationCoordinator(_coordinator); setOplogCollectionName(); + repl::setMinValid(_txn.get(), {OpTime{}, OpTime{}}); } void RSRollbackTest::tearDown() { @@ -169,7 +170,8 @@ void RSRollbackTest::tearDown() { void noSleep(Seconds seconds) {} TEST_F(RSRollbackTest, InconsistentMinValid) { - repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0)); + repl::setMinValid(_txn.get(), + {OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)}); auto status = syncRollback(_txn.get(), OpTime(), OplogInterfaceMock(kEmptyMockOperations), diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index ed023e1b9d6..f5e545fa31b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -40,15 +40,15 @@ #include "mongo/base/counter.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/service_context.h" +#include "mongo/db/global_timestamp.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/prefetch.h" #include "mongo/db/repl/bgsync.h" @@ -59,6 +59,7 @@ #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" @@ -502,8 +503,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl return; } - OpTime minvalid = getMinValid(txn); - if (minvalid > replCoord->getMyLastOptime()) { + BatchBoundaries boundaries = getMinValid(txn); + if (!boundaries.start.isNull() || boundaries.end > replCoord->getMyLastOptime()) { return; } @@ -519,9 +520,10 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl void SyncTail::oplogApplication() { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + OperationContextImpl txn; + OpTime originalEndOpTime(getMinValid(&txn).end); while (!inShutdown()) { OpQueue ops; - OperationContextImpl txn; Timer batchTimer; int lastTimeChecked = 0; @@ -591,10 +593,18 @@ void SyncTail::oplogApplication() { const BSONObj lastOp = ops.back(); handleSlaveDelay(lastOp); - // Set minValid to the last op to be applied in this next batch. + // Set minValid to the last OpTime to be applied in this batch. // This will cause this node to go into RECOVERING state - // if we should crash and restart before updating the oplog - setMinValid(&txn, fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp))); + // if we should crash and restart before updating finishing. + const OpTime start(getLastSetTimestamp(), OpTime::kUninitializedTerm); + + // Take the max of the first endOptime (if we recovered) and the end of our batch. + const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)); + const OpTime end(std::max(originalEndOpTime, lastOpTime)); + + // This write will not journal/checkpoint. + setMinValid(&txn, {start, end}); + multiApply(&txn, ops, &_prefetcherPool, @@ -602,6 +612,9 @@ void SyncTail::oplogApplication() { _applyFunc, this, supportsWaitingUntilDurable()); + + // This write will journal/checkpoint, and finish the batch. + setMinValid(&txn, end); } } @@ -823,8 +836,6 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) { MONGO_UNREACHABLE; } -static AtomicUInt32 replWriterWorkerId; - static void initializeWriterThread() { // Only do this once per thread if (!ClientBasic::getCurrent()) { |