summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2015-10-21 09:21:58 -0400
committerEric Milkie <milkie@10gen.com>2015-10-23 15:20:44 -0400
commit9997b730eaed62b0d5da9b624cf455555b685b9c (patch)
tree821d1aa8fb295c53fa726e94f625b40b9cab2598
parent5ccffd3e136b8423ad75568dea3c6b42bbe4475e (diff)
downloadmongo-9997b730eaed62b0d5da9b624cf455555b685b9c.tar.gz
SERVER-21028 Plug a race when waiting for new snapshots as part of read concern level majority.
-rw-r--r--src/mongo/db/db_raii.cpp2
-rw-r--r--src/mongo/db/dbcommands.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp60
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h3
-rw-r--r--src/mongo/db/storage/recovery_unit.h2
8 files changed, 58 insertions, 45 deletions
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 0d203651f10..ce0260e677d 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -130,7 +130,7 @@ void AutoGetCollectionForRead::_ensureMajorityCommittedSnapshotIsValid(const Nam
// Yield locks.
_autoColl = {};
- repl::ReplicationCoordinator::get(_txn)->waitForNewSnapshot(_txn);
+ repl::ReplicationCoordinator::get(_txn)->waitUntilSnapshotCommitted(_txn, *minSnapshot);
uassertStatusOK(_txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index 9d18c8269e8..8fff85f24d5 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -1318,10 +1318,10 @@ bool Command::run(OperationContext* txn,
repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
- repl::ReadConcernArgs readConcern;
+ repl::ReadConcernArgs readConcernArgs;
{
// parse and validate ReadConcernArgs
- auto readConcernParseStatus = readConcern.initialize(request.getCommandArgs());
+ auto readConcernParseStatus = readConcernArgs.initialize(request.getCommandArgs());
if (!readConcernParseStatus.isOK()) {
replyBuilder->setMetadata(rpc::makeEmptyMetadata())
.setCommandReply(readConcernParseStatus);
@@ -1331,8 +1331,8 @@ bool Command::run(OperationContext* txn,
if (!supportsReadConcern()) {
// Only return an error if a non-nullish readConcern was parsed, but do not process
// readConcern regardless.
- if (!readConcern.getOpTime().isNull() ||
- readConcern.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) {
+ if (!readConcernArgs.getOpTime().isNull() ||
+ readConcernArgs.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) {
replyBuilder->setMetadata(rpc::makeEmptyMetadata())
.setCommandReply({ErrorCodes::InvalidOptions,
str::stream()
@@ -1344,7 +1344,7 @@ bool Command::run(OperationContext* txn,
// Skip waiting for the OpTime when testing snapshot behavior.
if (!testingSnapshotBehaviorInIsolation) {
// Wait for readConcern to be satisfied.
- auto readConcernResult = replCoord->waitUntilOpTime(txn, readConcern);
+ auto readConcernResult = replCoord->waitUntilOpTime(txn, readConcernArgs);
readConcernResult.appendInfo(&replyBuilderBob);
if (!readConcernResult.getStatus().isOK()) {
replyBuilder->setMetadata(rpc::makeEmptyMetadata())
@@ -1356,12 +1356,12 @@ bool Command::run(OperationContext* txn,
if ((replCoord->getReplicationMode() ==
repl::ReplicationCoordinator::Mode::modeReplSet ||
testingSnapshotBehaviorInIsolation) &&
- readConcern.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) {
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern) {
Status status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
// Wait until a snapshot is available.
while (status == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
- replCoord->waitForNewSnapshot(txn);
+ replCoord->waitUntilSnapshotCommitted(txn, SnapshotName::min());
status = txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot();
}
@@ -1392,7 +1392,7 @@ bool Command::run(OperationContext* txn,
repl::OpTime lastOpTimeFromClient =
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
replCoord->prepareReplResponseMetadata(
- request, lastOpTimeFromClient, readConcern, &metadataBob);
+ request, lastOpTimeFromClient, readConcernArgs, &metadataBob);
// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18326
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index c9efff9a7ab..2c0d88a8277 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -703,10 +703,12 @@ public:
virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) = 0;
/**
- * Called by threads that wish to be alerted of the creation of a new snapshot. "txn" is used
- * to checkForInterrupt and enforce maxTimeMS.
+ * Blocks until either the current committed snapshot is at least as high as 'untilSnapshot',
+ * or we are interrupted for any reason, including shutdown or maxTimeMs expiration.
+ * 'txn' is used to checkForInterrupt and enforce maxTimeMS.
*/
- virtual void waitForNewSnapshot(OperationContext* txn) = 0;
+ virtual void waitUntilSnapshotCommitted(OperationContext* txn,
+ const SnapshotName& untilSnapshot) = 0;
/**
* Resets all information related to snapshotting.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 98f0e52d107..ec575a7d40c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -673,6 +673,12 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
fassert(28665, lastOpTime.getStatus().isOK());
_setFirstOpTimeOfMyTerm(lastOpTime.getValue());
+ lk.lock();
+ // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
+ // our election in logTransitionToPrimaryToOplog(), above.
+ _updateLastCommittedOpTime_inlock();
+ lk.unlock();
+
log() << "transition to primary complete; database writes are now permitted" << rsLog;
}
@@ -892,23 +898,18 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext
"--enableMajorityReadConcern."));
}
- const auto ts = settings.getOpTime();
- // Note that if 'settings' has no explicit after-optime, 'ts' will be the earliest
- // possible optime, which means the comparisons with 'ts' below are always false. This is
- // intentional.
+ const auto targetOpTime = settings.getOpTime();
+ if (targetOpTime.isNull()) {
+ return ReadConcernResponse(Status::OK(), Milliseconds(0));
+ }
if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) {
// For master/slave and standalone nodes, readAfterOpTime is not supported, so we return
// an error. However, we consider all writes "committed" and can treat MajorityReadConcern
// as LocalReadConcern, which is immediately satisfied since there is no OpTime to wait for.
- if (!ts.isNull()) {
- return ReadConcernResponse(
- Status(ErrorCodes::NotAReplicaSet,
- "node needs to be a replica set member to use read concern"));
-
- } else {
- return ReadConcernResponse(Status::OK(), Milliseconds(0));
- }
+ return ReadConcernResponse(
+ Status(ErrorCodes::NotAReplicaSet,
+ "node needs to be a replica set member to use read concern"));
}
Timer timer;
@@ -919,10 +920,10 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext
"Current storage engine does not support majority readConcerns"));
}
- auto loopCondition = [this, isMajorityReadConcern, ts] {
+ auto loopCondition = [this, isMajorityReadConcern, targetOpTime] {
return isMajorityReadConcern
- ? !_currentCommittedSnapshot || ts > _currentCommittedSnapshot->opTime
- : ts > _getMyLastOptime_inlock();
+ ? !_currentCommittedSnapshot || targetOpTime > _currentCommittedSnapshot->opTime
+ : targetOpTime > _getMyLastOptime_inlock();
};
while (loopCondition()) {
@@ -942,7 +943,7 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext
WaiterInfo waitInfo(isMajorityReadConcern ? &_replicationWaiterList : &_opTimeWaiterList,
txn->getOpID(),
- &ts,
+ &targetOpTime,
isMajorityReadConcern ? &writeConcern : nullptr,
&condVar);
@@ -1037,8 +1038,8 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new snapshot.
- _snapshotCreatedCond.notify_all();
+ // Wake ops waiting for a new committed snapshot.
+ _currentCommittedSnapshotCond.notify_all();
for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
it != _replicationWaiterList.end();
@@ -1062,8 +1063,8 @@ void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
void ReplicationCoordinatorImpl::interruptAll() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new snapshot.
- _snapshotCreatedCond.notify_all();
+ // Wake ops waiting for a new committed snapshot.
+ _currentCommittedSnapshotCond.notify_all();
for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
it != _replicationWaiterList.end();
@@ -2781,13 +2782,11 @@ void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
if (votingNodesOpTimes.size() < static_cast<unsigned long>(_rsConfig.getWriteMajority())) {
return;
}
-
std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
// need the majority to have this OpTime
OpTime committedOpTime =
votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
-
_setLastCommittedOpTime_inlock(committedOpTime);
}
@@ -3166,17 +3165,25 @@ void ReplicationCoordinatorImpl::forceSnapshotCreation() {
_externalState->forceSnapshotCreation();
}
-void ReplicationCoordinatorImpl::waitForNewSnapshot(OperationContext* txn) {
+void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* txn,
+ const SnapshotName& untilSnapshot) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- _snapshotCreatedCond.wait_for(lock, Microseconds(txn->getRemainingMaxTimeMicros()));
- txn->checkForInterrupt();
+
+ while (!_currentCommittedSnapshot || _currentCommittedSnapshot->name < untilSnapshot) {
+ Microseconds waitTime(txn->getRemainingMaxTimeMicros());
+ if (waitTime == Microseconds(0)) {
+ _currentCommittedSnapshotCond.wait(lock);
+ } else {
+ _currentCommittedSnapshotCond.wait_for(lock, waitTime);
+ }
+ txn->checkForInterrupt();
+ }
}
void ReplicationCoordinatorImpl::onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
auto snapshotInfo = SnapshotInfo{timeOfSnapshot, name};
- _snapshotCreatedCond.notify_all();
if (timeOfSnapshot <= _lastCommittedOpTime) {
// This snapshot is ready to be marked as committed.
@@ -3208,6 +3215,7 @@ void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
invariant(newCommittedSnapshot < _uncommittedSnapshots.front());
_currentCommittedSnapshot = newCommittedSnapshot;
+ _currentCommittedSnapshotCond.notify_all();
_externalState->updateCommittedSnapshot(newCommittedSnapshot.name);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index a8b11b2a727..05c594d92ee 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -300,7 +300,8 @@ public:
virtual OpTime getCurrentCommittedSnapshotOpTime() override;
- virtual void waitForNewSnapshot(OperationContext* txn) override;
+ virtual void waitUntilSnapshotCommitted(OperationContext* txn,
+ const SnapshotName& untilSnapshot) override;
virtual void appendConnectionStats(BSONObjBuilder* b) override;
@@ -1256,9 +1257,6 @@ private:
// Used to signal threads waiting for changes to _rsConfigState.
stdx::condition_variable _rsConfigStateChange; // (M)
- // Used to signal threads that are waiting for new snapshots.
- stdx::condition_variable _snapshotCreatedCond; // (M)
-
// Represents the configuration state of the coordinator, which controls how and when
// _rsConfig may change. See the state transition diagram in the type definition of
// ConfigState for details.
@@ -1335,6 +1333,9 @@ private:
// When engaged, this must be <= _lastCommittedOpTime and < _uncommittedSnapshots.front().
boost::optional<SnapshotInfo> _currentCommittedSnapshot; // (M)
+ // Used to signal threads that are waiting for new committed snapshots.
+ stdx::condition_variable _currentCommittedSnapshotCond; // (M)
+
// The cached current term. It's in sync with the term in topology coordinator.
long long _cachedTerm = OpTime::kUninitializedTerm; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 077e64b3c36..312eaf0a257 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -391,7 +391,8 @@ OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() {
return OpTime();
}
-void ReplicationCoordinatorMock::waitForNewSnapshot(OperationContext* txn) {}
+void ReplicationCoordinatorMock::waitUntilSnapshotCommitted(OperationContext* txn,
+ const SnapshotName& untilSnapshot) {}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 8373f79dae7..d6050d634a0 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -228,7 +228,8 @@ public:
virtual OpTime getCurrentCommittedSnapshotOpTime() override;
- virtual void waitForNewSnapshot(OperationContext* txn) override;
+ virtual void waitUntilSnapshotCommitted(OperationContext* txn,
+ const SnapshotName& untilSnapshot) override;
private:
AtomicUInt64 _snapshotNameGenerator;
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index f2bafcc944b..47e25f94dc1 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -94,7 +94,7 @@ public:
* change snapshots.
*
* If no snapshot has yet been marked as Majority Committed, returns a status with error
- * code ReadConcernNotAvailableYet. After this returns successfully, at any point where
+ * code ReadConcernMajorityNotAvailableYet. After this returns successfully, at any point where
* implementations attempt to acquire committed snapshot, if there are none available due to a
* call to SnapshotManager::dropAllSnapshots(), a UserException with the same code should be
* thrown.