diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-05-21 01:06:12 -0400 |
---|---|---|
committer | Siyuan Zhou <visualzhou@gmail.com> | 2017-01-10 18:40:52 -0500 |
commit | 0addb5906136dd9e6b0510c659c446751bfab680 (patch) | |
tree | 34259ef9f6d49a628260c48ee064319d31fb4795 | |
parent | 52b68fa86ea43e909ad42c901d0579bced6b205f (diff) | |
download | mongo-0addb5906136dd9e6b0510c659c446751bfab680.tar.gz |
SERVER-22136 Attach term metadata to UpdatePosition command
(cherry picked from commit 551fb26a8ad83c64ea018594c06fbda7002e996c)
Conflicts:
src/mongo/db/dbcommands.cpp
src/mongo/db/repl/replication_coordinator_impl.cpp
src/mongo/db/repl/replication_coordinator_impl.h
src/mongo/db/repl/replication_coordinator_impl_test.cpp
src/mongo/db/repl/replication_coordinator_mock.cpp
src/mongo/db/repl/replication_coordinator_mock.h
src/mongo/db/repl/replset_commands.cpp
src/mongo/db/repl/topology_coordinator_impl.h
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 89 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 6 |
11 files changed, 118 insertions, 80 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 228c5ea150d..31c453ecaa3 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -92,6 +92,7 @@ #include "mongo/rpc/reply_builder_interface.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/rpc/protocol.h" @@ -1466,8 +1467,9 @@ bool Command::run(OperationContext* txn, if (isReplSet) { repl::OpTime lastOpTimeFromClient = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, &metadataBob); - + if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) { + replCoord->prepareReplMetadata(lastOpTimeFromClient, &metadataBob); + } // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18326 if (isShardingAware || serverGlobalParams.configsvr) { diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index dd55fcca799..18ccf45a224 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -685,9 +685,8 @@ public: /** * Prepares a metadata object describing the current term, primary, and lastOp information. */ - virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) = 0; + virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const = 0; /** * Returns true if the V1 election protocol is being used and false otherwise. @@ -765,7 +764,7 @@ public: /** * Gets the latest OpTime of the currentCommittedSnapshot. */ - virtual OpTime getCurrentCommittedSnapshotOpTime() = 0; + virtual OpTime getCurrentCommittedSnapshotOpTime() const = 0; /** * Appends connection information to the provided BSONObjBuilder. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 5439c40f46b..698f810fe6e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -329,7 +329,7 @@ Date_t ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const { return _priorityTakeoverWhen; } -OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() { +OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_currentCommittedSnapshot) { return _currentCommittedSnapshot->opTime; @@ -1846,32 +1846,35 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const { } bool ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_rsConfig.isInitialized()); - // Do not send updates if we have been removed from the config. - if (_selfIndex == -1) { - return false; - } - cmdBuilder->append("replSetUpdatePosition", 1); - // Create an array containing objects each live member connected to us and for ourself. - BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); - for (SlaveInfoVector::iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end(); ++itr) { - if (itr->lastAppliedOpTime.isNull()) { - // Don't include info on members we haven't heard from yet. - continue; - } - // Don't include members we think are down. - if (!itr->self && itr->down) { - continue; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_rsConfig.isInitialized()); + // Do not send updates if we have been removed from the config. + if (_selfIndex == -1) { + return false; } + cmdBuilder->append("replSetUpdatePosition", 1); + // Create an array containing objects each live member connected to us and for ourself. + BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); + for (SlaveInfoVector::iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end(); ++itr) { + if (itr->lastAppliedOpTime.isNull()) { + // Don't include info on members we haven't heard from yet. + continue; + } + // Don't include members we think are down. + if (!itr->self && itr->down) { + continue; + } - BSONObjBuilder entry(arrayBuilder.subobjStart()); - itr->lastDurableOpTime.append(&entry, "durableOpTime"); - itr->lastAppliedOpTime.append(&entry, "appliedOpTime"); - entry.append("memberId", itr->memberId); - entry.append("cfgver", _rsConfig.getConfigVersion()); + BSONObjBuilder entry(arrayBuilder.subobjStart()); + itr->lastDurableOpTime.append(&entry, "durableOpTime"); + itr->lastAppliedOpTime.append(&entry, "appliedOpTime"); + entry.append("memberId", itr->memberId); + entry.append("cfgver", _rsConfig.getConfigVersion()); + } } - + // Add metadata to command. Old style parsing logic will reject the metadata. + prepareReplMetadata(OpTime(), cmdBuilder); return true; } @@ -3267,37 +3270,35 @@ void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish( *result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm); } -void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) { - if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) { - rpc::ReplSetMetadata metadata; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish, - this, - stdx::placeholders::_1, - lastOpTimeFromClient, - &metadata)); +void ReplicationCoordinatorImpl::prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const { + rpc::ReplSetMetadata metadata; - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - - fassert(28709, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish, + this, + stdx::placeholders::_1, + lastOpTimeFromClient, + &metadata)); - metadata.writeToMetadata(builder); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; } + + fassert(28709, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + + metadata.writeToMetadata(builder); } void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish( const ReplicationExecutor::CallbackArgs& cbData, const OpTime& lastOpTimeFromClient, - rpc::ReplSetMetadata* metadata) { + rpc::ReplSetMetadata* metadata) const { OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime(); OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime); - _topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, _lastCommittedOpTime); + _topCoord->prepareReplMetadata(metadata, lastVisibleOpTime, _lastCommittedOpTime); } bool ReplicationCoordinatorImpl::isV1ElectionProtocol() { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 33b93a70027..3c6bbaefdbb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -290,9 +290,8 @@ public: virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm) override; - void prepareReplResponseMetadata(const rpc::RequestInterface&, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) override; + virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) override; @@ -317,7 +316,7 @@ public: virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override; - virtual OpTime getCurrentCommittedSnapshotOpTime() override; + virtual OpTime getCurrentCommittedSnapshotOpTime() const override; virtual void waitUntilSnapshotCommitted(OperationContext* txn, const SnapshotName& untilSnapshot) override; @@ -663,11 +662,11 @@ private: Status* result); /** - * Bottom half of prepareReplResponseMetadata. + * Bottom half of prepareReplMetadata. */ void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData, const OpTime& lastOpTimeFromClient, - rpc::ReplSetMetadata* metadata); + rpc::ReplSetMetadata* metadata) const; /** * Scheduled to cause the ReplicationCoordinator to reconsider any state that might * need to change as a result of time passing - for instance becoming PRIMARY when a single diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 2c791030e76..91fde3615b0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -2038,7 +2038,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInUpdatePositionCommand) { getReplCoord()->prepareReplSetUpdatePositionCommand(&cmdBuilder); BSONObj cmd = cmdBuilder.done(); - ASSERT_EQUALS(2, cmd.nFields()); + ASSERT_EQUALS(3, cmd.nFields()); ASSERT_EQUALS("replSetUpdatePosition", cmd.firstElement().fieldNameStringData()); std::set<long long> memberIds; @@ -4221,6 +4221,36 @@ TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelieve ASSERT_EQUALS(1U, memberIds4.size()); } +TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { + assertStartSuccess( + BSON("_id" + << "mySet" + << "version" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "test1:1234") + << BSON("_id" << 1 << "host" + << "test2:1234") << BSON("_id" << 2 << "host" + << "test3:1234")) + << "protocolVersion" << 1 << "settings" + << BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)), + HostAndPort("test1", 1234)); + OpTime optime(Timestamp(100, 2), 0); + getReplCoord()->setMyLastAppliedOpTime(optime); + getReplCoord()->setMyLastDurableOpTime(optime); + + // Set last committed optime via metadata. + rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); + getReplCoord()->processReplSetMetadata(syncSourceMetadata); + getReplCoord()->onSnapshotCreate(optime, SnapshotName(1)); + + BSONObjBuilder cmdBuilder; + ASSERT_TRUE(getReplCoord()->prepareReplSetUpdatePositionCommand(&cmdBuilder)); + BSONObj cmd = cmdBuilder.obj(); + auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd)); + ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm()); + ASSERT_EQUALS(metadata.getLastOpVisible(), optime); +} + TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNodesDown) { assertStartSuccess( BSON("_id" diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 7d3f987878f..cb763bed4ff 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -381,9 +381,8 @@ Status ReplicationCoordinatorMock::processReplSetDeclareElectionWinner( return Status::OK(); } -void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) {} +void ReplicationCoordinatorMock::prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const {} Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) { @@ -418,7 +417,7 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho void ReplicationCoordinatorMock::dropAllSnapshots() {} -OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() { +OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const { return OpTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 15315cd5a8f..861799f00d2 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -217,9 +217,8 @@ public: virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm); - void prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) override; + void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response); @@ -242,7 +241,7 @@ public: virtual void dropAllSnapshots() override; - virtual OpTime getCurrentCommittedSnapshotOpTime() override; + virtual OpTime getCurrentCommittedSnapshotOpTime() const override; virtual void waitUntilSnapshotCommitted(OperationContext* txn, const SnapshotName& untilSnapshot) override; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index ee0aa50a6f0..e6d90aa82ec 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -58,6 +58,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -664,7 +665,9 @@ public: int, string& errmsg, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + auto replCoord = repl::ReplicationCoordinator::get(txn->getClient()->getServiceContext()); + + Status status = replCoord->checkReplEnabledForCommand(&result); if (!status.isOK()) return appendCommandStatus(result, status); @@ -673,6 +676,14 @@ public: if (cmdObj.hasField("handshake")) return true; + auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj); + if (metadataResult.isOK()) { + // New style update position command has metadata, which may inform the + // upstream of a higher term. + auto metadata = metadataResult.getValue(); + replCoord->processReplSetMetadata(metadata); + } + // In the case of an update from a member with an invalid replica set config, // we return our current config version. long long configVersion = -1; @@ -681,9 +692,8 @@ public: status = args.initialize(cmdObj); if (status.isOK()) { - // v3.2.2+ style replSetUpdatePosition command. - status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition( - args, &configVersion); + // v3.2.4+ style replSetUpdatePosition command. + status = replCoord->processReplSetUpdatePosition(args, &configVersion); if (status == ErrorCodes::InvalidReplicaSetConfig) { result.append("configVersion", configVersion); @@ -696,8 +706,7 @@ public: if (!status.isOK()) return appendCommandStatus(result, status); - status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition( - oldArgs, &configVersion); + status = replCoord->processReplSetUpdatePosition(oldArgs, &configVersion); if (status == ErrorCodes::InvalidReplicaSetConfig) { result.append("configVersion", configVersion); diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 5e8cda75505..3a51485052a 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -422,9 +422,9 @@ public: /** * Prepares a BSONObj describing the current term, primary, and lastOp information. */ - virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommittedOpTime) const = 0; + virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, + const OpTime& lastCommittedOpTime) const = 0; /** * Writes into 'output' all the information needed to generate a summary of the current diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index afd720d7869..b235cbf5482 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2419,9 +2419,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return false; } -void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommittedOpTime) const { +void TopologyCoordinatorImpl::prepareReplMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, + const OpTime& lastCommittedOpTime) const { *metadata = rpc::ReplSetMetadata(_term, lastCommittedOpTime, diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 21b9799fca2..3645ebe1aaf 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -225,9 +225,9 @@ public: const OpTime& lastOpCommitted); virtual bool stepDownIfPending(); virtual Date_t getStepDownTime() const; - virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommitttedOpTime) const; + virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, + const OpTime& lastCommitttedOpTime) const; Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm); virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, |