diff options
author | Eric Milkie <milkie@10gen.com> | 2015-10-21 09:21:58 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2015-10-23 15:20:44 -0400 |
commit | 9997b730eaed62b0d5da9b624cf455555b685b9c (patch) | |
tree | 821d1aa8fb295c53fa726e94f625b40b9cab2598 | |
parent | 5ccffd3e136b8423ad75568dea3c6b42bbe4475e (diff) | |
download | mongo-9997b730eaed62b0d5da9b624cf455555b685b9c.tar.gz |
SERVER-21028 Plug a race when waiting for new snapshots as part of read concern level majority.
-rw-r--r-- | src/mongo/db/db_raii.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 60 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/recovery_unit.h | 2 |
8 files changed, 58 insertions, 45 deletions
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 0d203651f10..ce0260e677d 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -130,7 +130,7 @@ void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const Nam // Yield locks. _autoColl = {}; - repl::ReplicationCoordinator::get(_txn)->waitForNewSnapshot(_txn); + repl::ReplicationCoordinator::get(_txn)->waitUntilSnapshotCommitted(_txn, *minSnapshot); uassertStatusOK(_txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 9d18c8269e8..8fff85f24d5 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1318,10 +1318,10 @@ bool Command::run(OperationContext* txn, repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); - repl::ReadConcernArgs readConcern; + repl::ReadConcernArgs readConcernArgs; { // parse and validate ReadConcernArgs - auto readConcernParseStatus = readConcern.initialize(request.getCommandArgs()); + auto readConcernParseStatus = readConcernArgs.initialize(request.getCommandArgs()); if (!readConcernParseStatus.isOK()) { replyBuilder->setMetadata(rpc::makeEmptyMetadata()) .setCommandReply(readConcernParseStatus); @@ -1331,8 +1331,8 @@ bool Command::run(OperationContext* txn, if (!supportsReadConcern()) { // Only return an error if a non-nullish readConcern was parsed, but do not process // readConcern regardless. - if (!readConcern.getOpTime().isNull() || - readConcern.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) { + if (!readConcernArgs.getOpTime().isNull() || + readConcernArgs.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) { replyBuilder->setMetadata(rpc::makeEmptyMetadata()) .setCommandReply({ErrorCodes::InvalidOptions, str::stream() @@ -1344,7 +1344,7 @@ bool Command::run(OperationContext* txn, // Skip waiting for the OpTime when testing snapshot behavior. if (!testingSnapshotBehaviorInIsolation) { // Wait for readConcern to be satisfied. - auto readConcernResult = replCoord->waitUntilOpTime(txn, readConcern); + auto readConcernResult = replCoord->waitUntilOpTime(txn, readConcernArgs); readConcernResult.appendInfo(&replyBuilderBob); if (!readConcernResult.getStatus().isOK()) { replyBuilder->setMetadata(rpc::makeEmptyMetadata()) @@ -1356,12 +1356,12 @@ bool Command::run(OperationContext* txn, if ((replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet || testingSnapshotBehaviorInIsolation) && - readConcern.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) { + readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) { Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); // Wait until a snapshot is available. while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) { - replCoord->waitForNewSnapshot(txn); + replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min()); status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot(); } @@ -1392,7 +1392,7 @@ bool Command::run(OperationContext* txn, repl::OpTime lastOpTimeFromClient = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); replCoord->prepareReplResponseMetadata( - request, lastOpTimeFromClient, readConcern, &metadataBob); + request, lastOpTimeFromClient, readConcernArgs, &metadataBob); // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18326 diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index c9efff9a7ab..2c0d88a8277 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -703,10 +703,12 @@ public: virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) = 0; /** - * Called by threads that wish to be alerted of the creation of a new snapshot. "txn" is used - * to checkForInterrupt and enforce maxTimeMS. + * Blocks until either the current committed snapshot is at least as high as 'untilSnapshot', + * or we are interrupted for any reason, including shutdown or maxTimeMs expiration. + * 'txn' is used to checkForInterrupt and enforce maxTimeMS. */ - virtual void waitForNewSnapshot(OperationContext* txn) = 0; + virtual void waitUntilSnapshotCommitted(OperationContext* txn, + const SnapshotName& untilSnapshot) = 0; /** * Resets all information related to snapshotting. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 98f0e52d107..ec575a7d40c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -673,6 +673,12 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { fassert(28665, lastOpTime.getStatus().isOK()); _setFirstOpTimeOfMyTerm(lastOpTime.getValue()); + lk.lock(); + // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged + // our election in logTransitionToPrimaryToOplog(), above. + _updateLastCommittedOpTime_inlock(); + lk.unlock(); + log() << "transition to primary complete; database writes are now permitted" << rsLog; } @@ -892,23 +898,18 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext "--enableMajorityReadConcern.")); } - const auto ts = settings.getOpTime(); - // Note that if 'settings' has no explicit after-optime, 'ts' will be the earliest - // possible optime, which means the comparisons with 'ts' below are always false. This is - // intentional. + const auto targetOpTime = settings.getOpTime(); + if (targetOpTime.isNull()) { + return ReadConcernResponse(Status::OK(), Milliseconds(0)); + } if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { // For master/slave and standalone nodes, readAfterOpTime is not supported, so we return // an error. However, we consider all writes "committed" and can treat MajorityReadConcern // as LocalReadConcern, which is immediately satisfied since there is no OpTime to wait for. - if (!ts.isNull()) { - return ReadConcernResponse( - Status(ErrorCodes::NotAReplicaSet, - "node needs to be a replica set member to use read concern")); - - } else { - return ReadConcernResponse(Status::OK(), Milliseconds(0)); - } + return ReadConcernResponse( + Status(ErrorCodes::NotAReplicaSet, + "node needs to be a replica set member to use read concern")); } Timer timer; @@ -919,10 +920,10 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext "Current storage engine does not support majority readConcerns")); } - auto loopCondition = [this, isMajorityReadConcern, ts] { + auto loopCondition = [this, isMajorityReadConcern, targetOpTime] { return isMajorityReadConcern - ? !_currentCommittedSnapshot || ts > _currentCommittedSnapshot->opTime - : ts > _getMyLastOptime_inlock(); + ? !_currentCommittedSnapshot || targetOpTime > _currentCommittedSnapshot->opTime + : targetOpTime > _getMyLastOptime_inlock(); }; while (loopCondition()) { @@ -942,7 +943,7 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext WaiterInfo waitInfo(isMajorityReadConcern ? &_replicationWaiterList : &_opTimeWaiterList, txn->getOpID(), - &ts, + &targetOpTime, isMajorityReadConcern ? &writeConcern : nullptr, &condVar); @@ -1037,8 +1038,8 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg void ReplicationCoordinatorImpl::interrupt(unsigned opId) { stdx::lock_guard<stdx::mutex> lk(_mutex); - // Wake ops waiting for a new snapshot. - _snapshotCreatedCond.notify_all(); + // Wake ops waiting for a new committed snapshot. + _currentCommittedSnapshotCond.notify_all(); for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); it != _replicationWaiterList.end(); @@ -1062,8 +1063,8 @@ void ReplicationCoordinatorImpl::interrupt(unsigned opId) { void ReplicationCoordinatorImpl::interruptAll() { stdx::lock_guard<stdx::mutex> lk(_mutex); - // Wake ops waiting for a new snapshot. - _snapshotCreatedCond.notify_all(); + // Wake ops waiting for a new committed snapshot. + _currentCommittedSnapshotCond.notify_all(); for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); it != _replicationWaiterList.end(); @@ -2781,13 +2782,11 @@ void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) { return; } - std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); // need the majority to have this OpTime OpTime committedOpTime = votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()]; - _setLastCommittedOpTime_inlock(committedOpTime); } @@ -3166,17 +3165,25 @@ void ReplicationCoordinatorImpl::forceSnapshotCreation() { _externalState->forceSnapshotCreation(); } -void ReplicationCoordinatorImpl::waitForNewSnapshot(OperationContext* txn) { +void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* txn, + const SnapshotName& untilSnapshot) { stdx::unique_lock<stdx::mutex> lock(_mutex); - _snapshotCreatedCond.wait_for(lock, Microseconds(txn->getRemainingMaxTimeMicros())); - txn->checkForInterrupt(); + + while (!_currentCommittedSnapshot || _currentCommittedSnapshot->name < untilSnapshot) { + Microseconds waitTime(txn->getRemainingMaxTimeMicros()); + if (waitTime == Microseconds(0)) { + _currentCommittedSnapshotCond.wait(lock); + } else { + _currentCommittedSnapshotCond.wait_for(lock, waitTime); + } + txn->checkForInterrupt(); + } } void ReplicationCoordinatorImpl::onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) { stdx::lock_guard<stdx::mutex> lock(_mutex); auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name}; - _snapshotCreatedCond.notify_all(); if (timeOfSnapshot <= _lastCommittedOpTime) { // This snapshot is ready to be marked as committed. @@ -3208,6 +3215,7 @@ void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( invariant(newCommittedSnapshot < _uncommittedSnapshots.front()); _currentCommittedSnapshot = newCommittedSnapshot; + _currentCommittedSnapshotCond.notify_all(); _externalState->updateCommittedSnapshot(newCommittedSnapshot.name); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index a8b11b2a727..05c594d92ee 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -300,7 +300,8 @@ public: virtual OpTime getCurrentCommittedSnapshotOpTime() override; - virtual void waitForNewSnapshot(OperationContext* txn) override; + virtual void waitUntilSnapshotCommitted(OperationContext* txn, + const SnapshotName& untilSnapshot) override; virtual void appendConnectionStats(BSONObjBuilder* b) override; @@ -1256,9 +1257,6 @@ private: // Used to signal threads waiting for changes to _rsConfigState. stdx::condition_variable _rsConfigStateChange; // (M) - // Used to signal threads that are waiting for new snapshots. - stdx::condition_variable _snapshotCreatedCond; // (M) - // Represents the configuration state of the coordinator, which controls how and when // _rsConfig may change. See the state transition diagram in the type definition of // ConfigState for details. @@ -1335,6 +1333,9 @@ private: // When engaged, this must be <= _lastCommittedOpTime and < _uncommittedSnapshots.front(). boost::optional<SnapshotInfo> _currentCommittedSnapshot; // (M) + // Used to signal threads that are waiting for new committed snapshots. + stdx::condition_variable _currentCommittedSnapshotCond; // (M) + // The cached current term. It's in sync with the term in topology coordinator. long long _cachedTerm = OpTime::kUninitializedTerm; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 077e64b3c36..312eaf0a257 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -391,7 +391,8 @@ OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() { return OpTime(); } -void ReplicationCoordinatorMock::waitForNewSnapshot(OperationContext* txn) {} +void ReplicationCoordinatorMock::waitUntilSnapshotCommitted(OperationContext* txn, + const SnapshotName& untilSnapshot) {} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 8373f79dae7..d6050d634a0 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -228,7 +228,8 @@ public: virtual OpTime getCurrentCommittedSnapshotOpTime() override; - virtual void waitForNewSnapshot(OperationContext* txn) override; + virtual void waitUntilSnapshotCommitted(OperationContext* txn, + const SnapshotName& untilSnapshot) override; private: AtomicUInt64 _snapshotNameGenerator; diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index f2bafcc944b..47e25f94dc1 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -94,7 +94,7 @@ public: * change snapshots. * * If no snapshot has yet been marked as Majority Committed, returns a status with error - * code ReadConcernNotAvailableYet. After this returns successfully, at any point where + * code ReadConcernMajorityNotAvailableYet. After this returns successfully, at any point where * implementations attempt to acquire committed snapshot, if there are none available due to a * call to SnapshotManager::dropAllSnapshots(), a UserException with the same code should be * thrown. |