summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@tart.local>2016-03-22 11:42:41 -0400
committerScott Hernandez <scotthernandez@tart.local>2016-03-27 11:45:02 -0400
commitec1aaf5ce52bed2897e80cbba7add95c068809dc (patch)
treeb6b77429c32ff66454582aaa3cda5882d0f8c9bf /src
parent7a5d4214ca939ff3e522a493d848f9f35ac88d5c (diff)
downloadmongo-ec1aaf5ce52bed2897e80cbba7add95c068809dc.tar.gz
SERVER-23010: do not update durable OpTime, and update commited snapshot, if not durable
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/minvalid.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp54
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp13
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp2
4 files changed, 50 insertions, 27 deletions
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp
index 90753cff0f4..4add8bc4e38 100644
--- a/src/mongo/db/repl/minvalid.cpp
+++ b/src/mongo/db/repl/minvalid.cpp
@@ -64,9 +64,11 @@ void clearInitialSyncFlag(OperationContext* txn) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS);
auto replCoord = repl::ReplicationCoordinator::get(txn);
- OpTime time = replCoord->getMyLastAppliedOpTime();
- txn->recoveryUnit()->waitUntilDurable();
- replCoord->setMyLastDurableOpTime(time);
+ if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) {
+ OpTime time = replCoord->getMyLastAppliedOpTime();
+ txn->recoveryUnit()->waitUntilDurable();
+ replCoord->setMyLastDurableOpTime(time);
+ }
LOG(3) << "clearing initial sync flag";
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index d598d583d82..6e2b63bb1da 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -807,6 +807,7 @@ void ReplicationCoordinatorImpl::_updateSlaveInfoAppliedOpTime_inlock(SlaveInfo*
slaveInfo->lastUpdate = _replExecutor.now();
slaveInfo->down = false;
+ _updateLastCommittedOpTime_inlock();
// Wake up any threads waiting for replication that now have their replication
// check satisfied
_wakeReadyWaiters_inlock();
@@ -924,12 +925,6 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opT
if (opTime > _getMyLastAppliedOpTime_inlock()) {
const bool allowRollback = false;
_setMyLastAppliedOpTime_inlock(opTime, allowRollback);
-
- // If the storage engine isn't durable then we need to update the durableOpTime.
- if (!_isDurableStorageEngine()) {
- _setMyLastDurableOpTime_inlock(opTime, allowRollback);
- }
-
_reportUpstream_inlock(std::move(lock));
}
}
@@ -1072,19 +1067,34 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext
Milliseconds(timer.millis()));
}
+ // Now we have to wait to be notified when
+ // either the optime or committed snapshot change.
+
+ // If we are doing a majority read concern we only need to wait
+ // for a new snapshot.
+ if (isMajorityReadConcern) {
+ // Wait for a snapshot that meets our needs (< targetOpTime).
+ const auto waitTime = CurOp::get(txn)->isMaxTimeSet()
+ ? Microseconds(txn->getRemainingMaxTimeMicros())
+ : Microseconds{0};
+ const auto waitForever = waitTime == Microseconds{0};
+ LOG(2) << "waitUntilOpTime: waiting for a new snapshot to occur for micros: "
+ << waitTime;
+ if (!waitForever) {
+ _currentCommittedSnapshotCond.wait_for(lock, waitTime);
+ } else {
+ _currentCommittedSnapshotCond.wait(lock);
+ }
+
+ LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
+ continue;
+ }
+
+ // We just need to wait for the opTime to catch up to what we need (not majority RC).
stdx::condition_variable condVar;
- WriteConcernOptions writeConcern;
- writeConcern.wMode = WriteConcernOptions::kMajority;
- writeConcern.syncMode = getWriteConcernMajorityShouldJournal_inlock()
- ? WriteConcernOptions::SyncMode::JOURNAL
- : WriteConcernOptions::SyncMode::NONE;
-
- WaiterInfo waitInfo(isMajorityReadConcern ? &_replicationWaiterList : &_opTimeWaiterList,
- txn->getOpID(),
- &targetOpTime,
- isMajorityReadConcern ? &writeConcern : nullptr,
- &condVar);
+ WaiterInfo waitInfo(&_opTimeWaiterList, txn->getOpID(), &targetOpTime, nullptr, &condVar);
+ LOG(3) << "Waiting for OpTime: " << waitInfo;
if (CurOp::get(txn)->isMaxTimeSet()) {
condVar.wait_for(lock, Microseconds(txn->getRemainingMaxTimeMicros()));
} else {
@@ -3241,20 +3251,26 @@ void ReplicationCoordinatorImpl::_setLastCommittedOpTime(const OpTime& committed
}
void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& committedOpTime) {
- if (committedOpTime <= _lastCommittedOpTime) {
+ if (committedOpTime == _lastCommittedOpTime) {
+ return; // Hasn't changed, so ignore it.
+ } else if (committedOpTime < _lastCommittedOpTime) {
+ LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime
+ << ", currentCommittedOpTime: " << _lastCommittedOpTime;
return; // This may have come from an out-of-order heartbeat. Ignore it.
}
// This check is performed to ensure primaries do not commit an OpTime from a previous term.
if (_getMemberState_inlock().primary() && committedOpTime < _firstOpTimeOfMyTerm) {
+ LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: "
+ << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm;
return;
}
if (_getMemberState_inlock().arbiter()) {
_setMyLastAppliedOpTime_inlock(committedOpTime, false);
- _setMyLastDurableOpTime_inlock(committedOpTime, false);
}
+ LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime;
_lastCommittedOpTime = committedOpTime;
_externalState->notifyOplogMetadataWaiters();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
index 61c1110a58b..acc9f8cf2ec 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
@@ -124,9 +124,11 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() {
invariant(_rsConfig.getMemberAt(_selfIndex).isElectable());
- OpTime lastOpTimeDurable(_getMyLastDurableOpTime_inlock());
+ // Note: If we aren't durable, send last applied.
+ const auto lastOpTime = _isDurableStorageEngine() ? _getMyLastDurableOpTime_inlock()
+ : _getMyLastAppliedOpTime_inlock();
- if (lastOpTimeDurable == OpTime()) {
+ if (lastOpTime == OpTime()) {
log() << "not trying to elect self, "
"do not yet have a complete set of data from any point in time";
return;
@@ -147,7 +149,7 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() {
_selfIndex,
_topCoord->getTerm(),
true, // dry run
- getMyLastDurableOpTime(),
+ lastOpTime,
stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
@@ -238,6 +240,9 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) {
invariant(!_electionWinnerDeclarer);
LoseElectionGuardV1 lossGuard(this);
+ const auto lastOpTime =
+ _isDurableStorageEngine() ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
+
_voteRequester.reset(new VoteRequester);
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
&_replExecutor,
@@ -245,7 +250,7 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) {
_selfIndex,
_topCoord->getTerm(),
false,
- getMyLastDurableOpTime(),
+ lastOpTime,
stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index e365741e12b..51749294c3d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -257,7 +257,7 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
_sessionCache.reset(new WiredTigerSessionCache(this));
- if (_durable) {
+ if (_durable && !_ephemeral) {
_journalFlusher = stdx::make_unique<WiredTigerJournalFlusher>(_sessionCache.get());
_journalFlusher->go();
}