diff options
author | smani87 <suganthi.mani@mongodb.com> | 2018-07-29 01:17:00 -0400 |
---|---|---|
committer | smani87 <suganthi.mani@mongodb.com> | 2018-07-29 04:29:29 -0400 |
commit | 3ab73b4546be136606e52549da4e93f469d413b3 (patch) | |
tree | 385c178c44dc538079cdd3340bcf4d585c81c255 /src/mongo/db/repl | |
parent | b807f350638757c8833d82fec14a90d7b3a03051 (diff) | |
download | mongo-3ab73b4546be136606e52549da4e93f469d413b3.tar.gz |
SERVER-36052 Remove _rsConfig.getProtocolVersion() check as part of pv0 code cleanup.
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_config.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_config.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 76 |
6 files changed, 41 insertions, 117 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 6545bace758..7a0ebb75d2e 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -72,11 +72,7 @@ Milliseconds calculateAwaitDataTimeout(const ReplSetConfig& config) { // Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election // timeout. This enables the sync source to communicate liveness of the primary to secondaries. // We never wait longer than 30 seconds. - // Under protocol version 0, use a default timeout of 2 seconds for awaitData. - if (config.getProtocolVersion() == 1LL) { - return std::min((config.getElectionTimeoutPeriod() / 2), maximumAwaitDataTimeoutMS); - } - return OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout; + return std::min((config.getElectionTimeoutPeriod() / 2), maximumAwaitDataTimeoutMS); } /** diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp index d6d4470203e..fcbe828bf2b 100644 --- a/src/mongo/db/repl/repl_set_config.cpp +++ b/src/mongo/db/repl/repl_set_config.cpp @@ -166,19 +166,9 @@ Status ReplSetConfig::_initialize(const BSONObj& cfg, bool forInitiate, OID defa // Parse protocol version // status = bsonExtractIntegerField(cfg, kProtocolVersionFieldName, &_protocolVersion); - if (!status.isOK()) { - if (status != ErrorCodes::NoSuchKey) { - return status; - } - if (forInitiate) { - // Default protocolVersion to 1 when initiating a new set. - _protocolVersion = 1; - } - // If protocolVersion field is missing but this *isn't* for an initiate, leave - // _protocolVersion at it's default of 0 for now. It will error later on, during - // validate(). - // TODO(spencer): Remove this after 4.0, when we no longer need mixed-version support - // with versions that don't always include the protocolVersion field. + // If 'protocolVersion' field is missing for initiate, then _protocolVersion defaults to 1. + if (!(status.isOK() || (status == ErrorCodes::NoSuchKey && forInitiate))) { + return status; } // diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h index 04bbae18722..7887b95f8be 100644 --- a/src/mongo/db/repl/repl_set_config.h +++ b/src/mongo/db/repl/repl_set_config.h @@ -311,7 +311,7 @@ public: * Gets the protocol version for this configuration. * * The protocol version number currently determines what election protocol is used by the - * cluster; 0 is the default and indicates the old 3.0 election protocol. + * cluster; 1 is the default. */ long long getProtocolVersion() const { return _protocolVersion; @@ -396,7 +396,7 @@ private: int _totalVotingMembers = 0; ReplSetTagConfig _tagConfig; StringMap<ReplSetTagPattern> _customWriteConcernModes; - long long _protocolVersion = 0; + long long _protocolVersion = 1; bool _configServer = false; OID _replicaSetId; ConnectionString _connectionString; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 51b7313efa9..37ca8a67292 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -577,16 +577,13 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( } } - long long term = OpTime::kUninitializedTerm; - if (localConfig.getProtocolVersion() == 1) { - // Restore the current term according to the terms of last oplog entry and last vote. - // The initial term of OpTime() is 0. - term = lastOpTime.getTerm(); - if (lastVoteStatus.isOK()) { - long long lastVoteTerm = lastVoteStatus.getValue().getTerm(); - if (term < lastVoteTerm) { - term = lastVoteTerm; - } + // Restore the current term according to the terms of last oplog entry and last vote. + // The initial term of OpTime() is 0. + long long term = lastOpTime.getTerm(); + if (lastVoteStatus.isOK()) { + long long lastVoteTerm = lastVoteStatus.getValue().getTerm(); + if (term < lastVoteTerm) { + term = lastVoteTerm; } } @@ -2380,10 +2377,7 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt // In pv1, the TopologyCoordinator has not set the term yet. It will be set to kInitialTerm if // the initiate succeeds so we pass that here. status = checkQuorumForInitiate( - _replExecutor.get(), - newConfig, - myIndex.getValue(), - newConfig.getProtocolVersion() == 1 ? OpTime::kInitialTerm : OpTime::kUninitializedTerm); + _replExecutor.get(), newConfig, myIndex.getValue(), OpTime::kInitialTerm); if (!status.isOK()) { error() << "replSetInitiate failed; " << status; @@ -2749,6 +2743,7 @@ ReplicationCoordinatorImpl::PostMemberStateUpdateAction ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(OperationContext* opCtx, const ReplSetConfig& newConfig, int myIndex) { + invariant(newConfig.getProtocolVersion() == 1); invariant(_settings.usingReplSets()); _cancelHeartbeats_inlock(); _setConfigState_inlock(kConfigSteady); @@ -2762,17 +2757,6 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(OperationContext* opCtx, _rsConfig = newConfig; _protVersion.store(_rsConfig.getProtocolVersion()); - // Warn if this config has protocol version 0 - if (newConfig.getProtocolVersion() == 0 && - (!oldConfig.isInitialized() || oldConfig.getProtocolVersion() == 1)) { - log() << startupWarningsLog; - log() << "** WARNING: This replica set was configured with protocol version 0." - << startupWarningsLog; - log() << "** This protocol version is deprecated and subject to be removed " - << startupWarningsLog; - log() << "** in a future version." << startupWarningsLog; - } - // Warn if running --nojournal and writeConcernMajorityJournalDefault = false StorageEngine* storageEngine = opCtx->getServiceContext()->getStorageEngine(); if (storageEngine && !storageEngine->isDurable() && @@ -2814,24 +2798,6 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(OperationContext* opCtx, } _updateLastCommittedOpTime_inlock(); - // Set election id if we're primary. - if (oldConfig.isInitialized() && _memberState.primary()) { - if (oldConfig.getProtocolVersion() > newConfig.getProtocolVersion()) { - // Downgrade - invariant(newConfig.getProtocolVersion() == 0); - _electionId = OID::gen(); - auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp(); - _topCoord->setElectionInfo(_electionId, ts); - } else if (oldConfig.getProtocolVersion() < newConfig.getProtocolVersion()) { - // Upgrade - invariant(newConfig.getProtocolVersion() == 1); - invariant(_topCoord->getTerm() != OpTime::kUninitializedTerm); - _electionId = OID::fromTerm(_topCoord->getTerm()); - auto ts = LogicalClock::get(getServiceContext())->reserveTicks(1).asTimestamp(); - _topCoord->setElectionInfo(_electionId, ts); - } - } - return action; } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 6ecb25d00c8..61a63ca9225 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -279,8 +279,8 @@ TEST_F(ReplCoordHBV1Test, responseBuilder << "ok" << 1; hbResp.addToBSON(&responseBuilder); net->scheduleResponse( - noi, startDate + Milliseconds(200), makeResponseStatus(responseBuilder.obj())); - assertRunUntil(startDate + Milliseconds(2200)); + noi, startDate + Milliseconds(50), makeResponseStatus(responseBuilder.obj())); + assertRunUntil(startDate + Milliseconds(550)); // Because the new config is stored using an out-of-band thread, we need to perform some // extra synchronization to let the executor finish the heartbeat reconfig. We know that diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index f4746332d86..5016ddc8e81 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -134,15 +134,8 @@ bool _hasOnlyAuthErrorUpHeartbeats(const std::vector<MemberData>& hbdata, const return foundAuthError; } -void appendOpTime(BSONObjBuilder* bob, - const char* elemName, - const OpTime& opTime, - const long long pv) { - if (pv == 1) { - opTime.append(bob, elemName); - } else { - bob->append(elemName, opTime.getTimestamp()); - } +void appendOpTime(BSONObjBuilder* bob, const char* elemName, const OpTime& opTime) { + opTime.append(bob, elemName); } } // namespace @@ -709,14 +702,12 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( // // Arbiters also decrease their heartbeat interval to at most half the election timeout period. Milliseconds heartbeatInterval = _rsConfig.getHeartbeatInterval(); - if (_rsConfig.getProtocolVersion() == 1) { - if (getMemberState().arbiter()) { - heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, - _rsConfig.getHeartbeatInterval()); - } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) { - heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, - _rsConfig.getHeartbeatInterval() / 4); - } + if (getMemberState().arbiter()) { + heartbeatInterval = + std::min(_rsConfig.getElectionTimeoutPeriod() / 2, _rsConfig.getHeartbeatInterval()); + } else if (getSyncSourceAddress().empty() && !_iAmPrimary()) { + heartbeatInterval = std::min(_rsConfig.getElectionTimeoutPeriod() / 2, + _rsConfig.getHeartbeatInterval() / 4); } const Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); @@ -807,7 +798,6 @@ HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( } HeartbeatResponseAction nextAction; - invariant(_rsConfig.getProtocolVersion() == 1); nextAction = _updatePrimaryFromHBDataV1(memberIndex, originalState, now); nextAction.setNextHeartbeatStartDate(nextHeartbeatStartDate); @@ -1406,7 +1396,7 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu response->append("stateStr", myState.toString()); response->append("uptime", rsStatusArgs.selfUptime); - appendOpTime(response, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); + appendOpTime(response, "optime", lastOpApplied); response->appendDate("optimeDate", Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); @@ -1437,7 +1427,7 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu bb.append("stateStr", myState.toString()); bb.append("uptime", rsStatusArgs.selfUptime); if (!_selfConfig().isArbiter()) { - appendOpTime(&bb, "optime", lastOpApplied, _rsConfig.getProtocolVersion()); + appendOpTime(&bb, "optime", lastOpApplied); bb.appendDate("optimeDate", Date_t::fromDurationSinceEpoch(Seconds(lastOpApplied.getSecs()))); } @@ -1490,12 +1480,8 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu it->getUpSince() != Date_t() ? durationCount<Seconds>(now - it->getUpSince()) : 0)); bb.append("uptime", uptime); if (!itConfig.isArbiter()) { - appendOpTime( - &bb, "optime", it->getHeartbeatAppliedOpTime(), _rsConfig.getProtocolVersion()); - appendOpTime(&bb, - "optimeDurable", - it->getHeartbeatDurableOpTime(), - _rsConfig.getProtocolVersion()); + appendOpTime(&bb, "optime", it->getHeartbeatAppliedOpTime()); + appendOpTime(&bb, "optimeDurable", it->getHeartbeatDurableOpTime()); bb.appendDate("optimeDate", Date_t::fromDurationSinceEpoch( @@ -1571,8 +1557,8 @@ void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatu rsStatusArgs.readConcernMajorityOpTime.append(&optimes, "readConcernMajorityOpTime"); } - appendOpTime(&optimes, "appliedOpTime", lastOpApplied, _rsConfig.getProtocolVersion()); - appendOpTime(&optimes, "durableOpTime", lastOpDurable, _rsConfig.getProtocolVersion()); + appendOpTime(&optimes, "appliedOpTime", lastOpApplied); + appendOpTime(&optimes, "durableOpTime", lastOpDurable); response->append("optimes", optimes.obj()); if (lastStableRecoveryTimestamp) { // Only include this field if the storage engine supports RTT. @@ -1636,14 +1622,10 @@ void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) { for (const auto& memberData : _memberData) { BSONObjBuilder entry(replicationProgress.subobjStart()); const auto lastDurableOpTime = memberData.getLastDurableOpTime(); - if (_rsConfig.getProtocolVersion() == 1) { - BSONObjBuilder opTime(entry.subobjStart("optime")); - opTime.append("ts", lastDurableOpTime.getTimestamp()); - opTime.append("term", lastDurableOpTime.getTerm()); - opTime.done(); - } else { - entry.append("optime", lastDurableOpTime.getTimestamp()); - } + BSONObjBuilder opTime(entry.subobjStart("optime")); + opTime.append("ts", lastDurableOpTime.getTimestamp()); + opTime.append("term", lastDurableOpTime.getTerm()); + opTime.done(); entry.append("host", memberData.getHostAndPort().toString()); if (_selfIndex >= 0) { const int memberId = memberData.getMemberId(); @@ -1818,15 +1800,9 @@ void TopologyCoordinator::updateConfig(const ReplSetConfig& newConfig, int selfI invariant(_role != Role::kCandidate); invariant(selfIndex < newConfig.getNumMembers()); - // Reset term on startup and upgrade/downgrade of protocol version. - if (!_rsConfig.isInitialized() || - _rsConfig.getProtocolVersion() != newConfig.getProtocolVersion()) { - if (newConfig.getProtocolVersion() == 1) { - _term = OpTime::kInitialTerm; - } else { - invariant(newConfig.getProtocolVersion() == 0); - _term = OpTime::kUninitializedTerm; - } + // Reset term on startup. + if (!_rsConfig.isInitialized()) { + _term = OpTime::kInitialTerm; LOG(1) << "Updated term in topology coordinator to " << _term << " due to new config"; } @@ -1941,8 +1917,6 @@ TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getMyUnelectabl result |= NotSecondary; } - // Election rules only for protocol version 1. - invariant(_rsConfig.getProtocolVersion() == 1); if (reason == StartElectionReason::kPriorityTakeover && !_amIFreshEnoughForPriorityTakeover()) { result |= NotCloseEnoughToLatestForPriorityTakeover; } @@ -2488,8 +2462,7 @@ bool TopologyCoordinator::shouldChangeSyncSource( return true; } - if (_rsConfig.getProtocolVersion() == 1 && - replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { + if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { log() << "Choosing new sync source because the config version supplied by " << currentSource << ", " << replMetadata.getConfigVersion() << ", does not match ours, " << _rsConfig.getConfigVersion(); @@ -2497,7 +2470,6 @@ bool TopologyCoordinator::shouldChangeSyncSource( } const int currentSourceIndex = _rsConfig.findMemberIndexByHostAndPort(currentSource); - // PV0 doesn't use metadata, we have to consult _rsConfig. if (currentSourceIndex == -1) { log() << "Choosing new sync source because " << currentSource.toString() << " is not in our config"; @@ -2534,8 +2506,8 @@ bool TopologyCoordinator::shouldChangeSyncSource( // Change sync source if they are not ahead of us, and don't have a sync source, // unless they are primary. const OpTime myLastOpTime = getMyLastAppliedOpTime(); - if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 && - currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) { + if (syncSourceIndex == -1 && currentSourceOpTime <= myLastOpTime && + primaryIndex != currentSourceIndex) { std::stringstream logMessage; logMessage << "Choosing new sync source because our current sync source, " << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime |