diff options
author | Scott Hernandez <scotthernandez@tart.local> | 2016-03-22 11:42:41 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@tart.local> | 2016-03-27 11:45:02 -0400 |
commit | ec1aaf5ce52bed2897e80cbba7add95c068809dc (patch) | |
tree | b6b77429c32ff66454582aaa3cda5882d0f8c9bf /src | |
parent | 7a5d4214ca939ff3e522a493d848f9f35ac88d5c (diff) | |
download | mongo-ec1aaf5ce52bed2897e80cbba7add95c068809dc.tar.gz |
SERVER-23010: do not update durable OpTime, and update commited snapshot, if not durable
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/minvalid.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 2 |
4 files changed, 50 insertions, 27 deletions
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp index 90753cff0f4..4add8bc4e38 100644 --- a/src/mongo/db/repl/minvalid.cpp +++ b/src/mongo/db/repl/minvalid.cpp @@ -64,9 +64,11 @@ void clearInitialSyncFlag(OperationContext* txn) { MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS); auto replCoord = repl::ReplicationCoordinator::get(txn); - OpTime time = replCoord->getMyLastAppliedOpTime(); - txn->recoveryUnit()->waitUntilDurable(); - replCoord->setMyLastDurableOpTime(time); + if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) { + OpTime time = replCoord->getMyLastAppliedOpTime(); + txn->recoveryUnit()->waitUntilDurable(); + replCoord->setMyLastDurableOpTime(time); + } LOG(3) << "clearing initial sync flag"; } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index d598d583d82..6e2b63bb1da 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -807,6 +807,7 @@ void ReplicationCoordinatorImpl::_updateSlaveInfoAppliedOpTime_inlock(SlaveInfo* slaveInfo->lastUpdate = _replExecutor.now(); slaveInfo->down = false; + _updateLastCommittedOpTime_inlock(); // Wake up any threads waiting for replication that now have their replication // check satisfied _wakeReadyWaiters_inlock(); @@ -924,12 +925,6 @@ void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opT if (opTime > _getMyLastAppliedOpTime_inlock()) { const bool allowRollback = false; _setMyLastAppliedOpTime_inlock(opTime, allowRollback); - - // If the storage engine isn't durable then we need to update the durableOpTime. - if (!_isDurableStorageEngine()) { - _setMyLastDurableOpTime_inlock(opTime, allowRollback); - } - _reportUpstream_inlock(std::move(lock)); } } @@ -1072,19 +1067,34 @@ ReadConcernResponse ReplicationCoordinatorImpl::waitUntilOpTime(OperationContext Milliseconds(timer.millis())); } + // Now we have to wait to be notified when + // either the optime or committed snapshot change. + + // If we are doing a majority read concern we only need to wait + // for a new snapshot. + if (isMajorityReadConcern) { + // Wait for a snapshot that meets our needs (< targetOpTime). + const auto waitTime = CurOp::get(txn)->isMaxTimeSet() + ? Microseconds(txn->getRemainingMaxTimeMicros()) + : Microseconds{0}; + const auto waitForever = waitTime == Microseconds{0}; + LOG(2) << "waitUntilOpTime: waiting for a new snapshot to occur for micros: " + << waitTime; + if (!waitForever) { + _currentCommittedSnapshotCond.wait_for(lock, waitTime); + } else { + _currentCommittedSnapshotCond.wait(lock); + } + + LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString(); + continue; + } + + // We just need to wait for the opTime to catch up to what we need (not majority RC). stdx::condition_variable condVar; - WriteConcernOptions writeConcern; - writeConcern.wMode = WriteConcernOptions::kMajority; - writeConcern.syncMode = getWriteConcernMajorityShouldJournal_inlock() - ? WriteConcernOptions::SyncMode::JOURNAL - : WriteConcernOptions::SyncMode::NONE; - - WaiterInfo waitInfo(isMajorityReadConcern ? &_replicationWaiterList : &_opTimeWaiterList, - txn->getOpID(), - &targetOpTime, - isMajorityReadConcern ? &writeConcern : nullptr, - &condVar); + WaiterInfo waitInfo(&_opTimeWaiterList, txn->getOpID(), &targetOpTime, nullptr, &condVar); + LOG(3) << "Waiting for OpTime: " << waitInfo; if (CurOp::get(txn)->isMaxTimeSet()) { condVar.wait_for(lock, Microseconds(txn->getRemainingMaxTimeMicros())); } else { @@ -3241,20 +3251,26 @@ void ReplicationCoordinatorImpl::_setLastCommittedOpTime(const OpTime& committed } void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& committedOpTime) { - if (committedOpTime <= _lastCommittedOpTime) { + if (committedOpTime == _lastCommittedOpTime) { + return; // Hasn't changed, so ignore it. + } else if (committedOpTime < _lastCommittedOpTime) { + LOG(1) << "Ignoring older committed snapshot optime: " << committedOpTime + << ", currentCommittedOpTime: " << _lastCommittedOpTime; return; // This may have come from an out-of-order heartbeat. Ignore it. } // This check is performed to ensure primaries do not commit an OpTime from a previous term. if (_getMemberState_inlock().primary() && committedOpTime < _firstOpTimeOfMyTerm) { + LOG(1) << "Ignoring older committed snapshot from before I became primary, optime: " + << committedOpTime << ", firstOpTimeOfMyTerm: " << _firstOpTimeOfMyTerm; return; } if (_getMemberState_inlock().arbiter()) { _setMyLastAppliedOpTime_inlock(committedOpTime, false); - _setMyLastDurableOpTime_inlock(committedOpTime, false); } + LOG(2) << "Updating _lastCommittedOpTime to " << committedOpTime; _lastCommittedOpTime = committedOpTime; _externalState->notifyOplogMetadataWaiters(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index 61c1110a58b..acc9f8cf2ec 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -124,9 +124,11 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() { invariant(_rsConfig.getMemberAt(_selfIndex).isElectable()); - OpTime lastOpTimeDurable(_getMyLastDurableOpTime_inlock()); + // Note: If we aren't durable, send last applied. + const auto lastOpTime = _isDurableStorageEngine() ? _getMyLastDurableOpTime_inlock() + : _getMyLastAppliedOpTime_inlock(); - if (lastOpTimeDurable == OpTime()) { + if (lastOpTime == OpTime()) { log() << "not trying to elect self, " "do not yet have a complete set of data from any point in time"; return; @@ -147,7 +149,7 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() { _selfIndex, _topCoord->getTerm(), true, // dry run - getMyLastDurableOpTime(), + lastOpTime, stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term)); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; @@ -238,6 +240,9 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) { invariant(!_electionWinnerDeclarer); LoseElectionGuardV1 lossGuard(this); + const auto lastOpTime = + _isDurableStorageEngine() ? getMyLastDurableOpTime() : getMyLastAppliedOpTime(); + _voteRequester.reset(new VoteRequester); StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start( &_replExecutor, @@ -245,7 +250,7 @@ void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) { _selfIndex, _topCoord->getTerm(), false, - getMyLastDurableOpTime(), + lastOpTime, stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm)); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index e365741e12b..51749294c3d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -257,7 +257,7 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, _sessionCache.reset(new WiredTigerSessionCache(this)); - if (_durable) { + if (_durable && !_ephemeral) { _journalFlusher = stdx::make_unique<WiredTigerJournalFlusher>(_sessionCache.get()); _journalFlusher->go(); } |