diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-05-21 01:06:12 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-06-01 16:39:42 -0400 |
commit | 551fb26a8ad83c64ea018594c06fbda7002e996c (patch) | |
tree | 362abcc051bb10586bd299b2004b744c4fb141b8 | |
parent | 8d41cd981f778d3f9d4e3add11ee26bd81f4fb11 (diff) | |
download | mongo-551fb26a8ad83c64ea018594c06fbda7002e996c.tar.gz |
SERVER-22136 Attach term metadata to UpdatePosition command
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 6 |
12 files changed, 128 insertions, 85 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index a7def1cb6c5..4b5df659bd7 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -92,6 +92,7 @@ #include "mongo/db/write_concern.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/rpc/protocol.h" @@ -1237,8 +1238,9 @@ void appendOpTimeMetadata(OperationContext* txn, // Attach our own last opTime. repl::OpTime lastOpTimeFromClient = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, metadataBob); - + if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) { + replCoord->prepareReplMetadata(lastOpTimeFromClient, metadataBob); + } // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18236 if (isShardingAware || isConfig) { diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 0f226a6b13b..3e915211977 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -679,9 +679,8 @@ public: /** * Prepares a metadata object describing the current term, primary, and lastOp information. */ - virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) = 0; + virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const = 0; /** * Returns true if the V1 election protocol is being used and false otherwise. @@ -759,7 +758,7 @@ public: /** * Gets the latest OpTime of the currentCommittedSnapshot. */ - virtual OpTime getCurrentCommittedSnapshotOpTime() = 0; + virtual OpTime getCurrentCommittedSnapshotOpTime() const = 0; /** * Appends connection information to the provided BSONObjBuilder. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index b6c89737b5f..d3d80f5ce65 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -338,7 +338,7 @@ Date_t ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const { return _priorityTakeoverWhen; } -OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() { +OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_currentCommittedSnapshot) { return _currentCommittedSnapshot->opTime; @@ -1889,49 +1889,55 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const { StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_rsConfig.isInitialized()); - // Do not send updates if we have been removed from the config. - if (_selfIndex == -1) { - return Status(ErrorCodes::NodeNotFound, - "This node is not in the current replset configuration."); - } BSONObjBuilder cmdBuilder; - cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1); - // Create an array containing objects each live member connected to us and for ourself. - BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes")); - for (const auto& slaveInfo : _slaveInfo) { - if (slaveInfo.lastAppliedOpTime.isNull()) { - // Don't include info on members we haven't heard from yet. - continue; - } - // Don't include members we think are down. - if (!slaveInfo.self && slaveInfo.down) { - continue; - } + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_rsConfig.isInitialized()); + // Do not send updates if we have been removed from the config. + if (_selfIndex == -1) { + return Status(ErrorCodes::NodeNotFound, + "This node is not in the current replset configuration."); + } + cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1); + // Create an array containing objects each live member connected to us and for ourself. + BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes")); + for (const auto& slaveInfo : _slaveInfo) { + if (slaveInfo.lastAppliedOpTime.isNull()) { + // Don't include info on members we haven't heard from yet. + continue; + } + // Don't include members we think are down. + if (!slaveInfo.self && slaveInfo.down) { + continue; + } - BSONObjBuilder entry(arrayBuilder.subobjStart()); - switch (commandStyle) { - case ReplSetUpdatePositionCommandStyle::kNewStyle: - slaveInfo.lastDurableOpTime.append(&entry, - UpdatePositionArgs::kDurableOpTimeFieldName); - slaveInfo.lastAppliedOpTime.append(&entry, - UpdatePositionArgs::kAppliedOpTimeFieldName); - break; - case ReplSetUpdatePositionCommandStyle::kOldStyle: - entry.append("_id", slaveInfo.rid); - if (isV1ElectionProtocol()) { - slaveInfo.lastDurableOpTime.append(&entry, "optime"); - } else { - entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp()); - } - break; + BSONObjBuilder entry(arrayBuilder.subobjStart()); + switch (commandStyle) { + case ReplSetUpdatePositionCommandStyle::kNewStyle: + slaveInfo.lastDurableOpTime.append(&entry, + UpdatePositionArgs::kDurableOpTimeFieldName); + slaveInfo.lastAppliedOpTime.append(&entry, + UpdatePositionArgs::kAppliedOpTimeFieldName); + break; + case ReplSetUpdatePositionCommandStyle::kOldStyle: + entry.append("_id", slaveInfo.rid); + if (isV1ElectionProtocol()) { + slaveInfo.lastDurableOpTime.append(&entry, "optime"); + } else { + entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp()); + } + break; + } + entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId); + entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion()); } - entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId); - entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion()); + arrayBuilder.done(); } - arrayBuilder.done(); + // Add metadata to command. Old style parsing logic will reject the metadata. + if (commandStyle == ReplSetUpdatePositionCommandStyle::kNewStyle) { + prepareReplMetadata(OpTime(), &cmdBuilder); + } return cmdBuilder.obj(); } @@ -3121,18 +3127,15 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( return Status::OK(); } -void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) { - if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) { - rpc::ReplSetMetadata metadata; - LockGuard topoLock(_topoMutex); +void ReplicationCoordinatorImpl::prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const { + rpc::ReplSetMetadata metadata; + LockGuard topoLock(_topoMutex); - OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime(); - OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime); - _topCoord->prepareReplResponseMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime); - metadata.writeToMetadata(builder); - } + OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime(); + OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime); + _topCoord->prepareReplMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime); + metadata.writeToMetadata(builder); } bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index bfa93ac3459..41c11a156d7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -286,9 +286,8 @@ public: const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) override; - void prepareReplResponseMetadata(const rpc::RequestInterface&, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) override; + void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) override; @@ -313,7 +312,7 @@ public: virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override; - virtual OpTime getCurrentCommittedSnapshotOpTime() override; + virtual OpTime getCurrentCommittedSnapshotOpTime() const override; virtual void waitUntilSnapshotCommitted(OperationContext* txn, const SnapshotName& untilSnapshot) override; diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 4c8945ac064..11ea4cf40ba 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -155,8 +155,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( replMetadata = responseStatus; } if (replMetadata.isOK()) { - // Asynchronous stepdown could happen, but it will be queued in executor after - // this function, so we cannot and don't need to wait for it to finish. + // Asynchronous stepdown could happen, but it will wait for _topoMutex and execute + // after this function, so we cannot and don't need to wait for it to finish. _processReplSetMetadata_incallback(replMetadata.getValue()); } } diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 0343865aa6c..c40542ebc5e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -1911,7 +1911,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInUpdatePositionCommand) { BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle)); - ASSERT_EQUALS(2, cmd.nFields()); + ASSERT_EQUALS(3, cmd.nFields()); ASSERT_EQUALS(UpdatePositionArgs::kCommandFieldName, cmd.firstElement().fieldNameStringData()); std::set<long long> memberIds; @@ -4483,6 +4483,40 @@ TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelieve ASSERT_EQUALS(1U, memberIds4.size()); } +TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { + assertStartSuccess( + BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "test1:1234") + << BSON("_id" << 1 << "host" + << "test2:1234") + << BSON("_id" << 2 << "host" + << "test3:1234")) + << "protocolVersion" + << 1 + << "settings" + << BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)), + HostAndPort("test1", 1234)); + OpTime optime(Timestamp(100, 2), 0); + getReplCoord()->setMyLastAppliedOpTime(optime); + getReplCoord()->setMyLastDurableOpTime(optime); + + // Set last committed optime via metadata. + rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); + getReplCoord()->processReplSetMetadata(syncSourceMetadata); + getReplCoord()->onSnapshotCreate(optime, SnapshotName(1)); + + BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand( + ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle)); + auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd)); + ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm()); + ASSERT_EQUALS(metadata.getLastOpVisible(), optime); +} + TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNodesDown) { assertStartSuccess( BSON("_id" diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 75efdf63845..ca59fd1a833 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -377,9 +377,8 @@ Status ReplicationCoordinatorMock::processReplSetRequestVotes( return Status::OK(); } -void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) {} +void ReplicationCoordinatorMock::prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const {} Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response) { @@ -414,7 +413,7 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho void ReplicationCoordinatorMock::dropAllSnapshots() {} -OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() { +OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const { return OpTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 3ace717f8c2..d2343d883ea 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -216,9 +216,8 @@ public: const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response); - void prepareReplResponseMetadata(const rpc::RequestInterface& request, - const OpTime& lastOpTimeFromClient, - BSONObjBuilder* builder) override; + void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, ReplSetHeartbeatResponse* response); @@ -241,7 +240,7 @@ public: virtual void dropAllSnapshots() override; - virtual OpTime getCurrentCommittedSnapshotOpTime() override; + virtual OpTime getCurrentCommittedSnapshotOpTime() const override; virtual void waitUntilSnapshotCommitted(OperationContext* txn, const SnapshotName& untilSnapshot) override; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index c91e28b71a6..0b26a4360ac 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -658,7 +658,9 @@ public: int, string& errmsg, BSONObjBuilder& result) { - Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result); + auto replCoord = repl::ReplicationCoordinator::get(txn->getClient()->getServiceContext()); + + Status status = replCoord->checkReplEnabledForCommand(&result); if (!status.isOK()) return appendCommandStatus(result, status); @@ -667,6 +669,14 @@ public: if (cmdObj.hasField("handshake")) return true; + auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj); + if (metadataResult.isOK()) { + // New style update position command has metadata, which may inform the + // upstream of a higher term. + auto metadata = metadataResult.getValue(); + replCoord->processReplSetMetadata(metadata); + } + // In the case of an update from a member with an invalid replica set config, // we return our current config version. long long configVersion = -1; @@ -676,8 +686,7 @@ public: status = args.initialize(cmdObj); if (status.isOK()) { // v3.2.4+ style replSetUpdatePosition command. - status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition( - args, &configVersion); + status = replCoord->processReplSetUpdatePosition(args, &configVersion); if (status == ErrorCodes::InvalidReplicaSetConfig) { result.append("configVersion", configVersion); @@ -690,8 +699,7 @@ public: if (!status.isOK()) return appendCommandStatus(result, status); - status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition( - oldArgs, &configVersion); + status = replCoord->processReplSetUpdatePosition(oldArgs, &configVersion); if (status == ErrorCodes::InvalidReplicaSetConfig) { result.append("configVersion", configVersion); diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 1548cb774a9..72c21024684 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -414,9 +414,9 @@ public: /** * Prepares a BSONObj describing the current term, primary, and lastOp information. */ - virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommittedOpTime) const = 0; + virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, + const OpTime& lastCommittedOpTime) const = 0; /** * Writes into 'output' all the information needed to generate a summary of the current diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index fb3c5aaec98..8d1ed0e9992 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2370,9 +2370,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return false; } -void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommittedOpTime) const { +void TopologyCoordinatorImpl::prepareReplMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, + const OpTime& lastCommittedOpTime) const { *metadata = rpc::ReplSetMetadata(_term, lastCommittedOpTime, diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index d3eb233acb8..fefebd6d9db 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -217,9 +217,9 @@ public: virtual bool stepDown(Date_t until, bool force, const OpTime& lastOpApplied); virtual bool stepDownIfPending(); virtual Date_t getStepDownTime() const; - virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommitttedOpTime) const; + virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, + const OpTime& lastCommitttedOpTime) const; virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response, const OpTime& lastAppliedOpTime); |