summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-05-21 01:06:12 -0400
committerSiyuan Zhou <visualzhou@gmail.com>2017-01-10 18:40:52 -0500
commit0addb5906136dd9e6b0510c659c446751bfab680 (patch)
tree34259ef9f6d49a628260c48ee064319d31fb4795
parent52b68fa86ea43e909ad42c901d0579bced6b205f (diff)
downloadmongo-0addb5906136dd9e6b0510c659c446751bfab680.tar.gz
SERVER-22136 Attach term metadata to UpdatePosition command
(cherry picked from commit 551fb26a8ad83c64ea018594c06fbda7002e996c) Conflicts: src/mongo/db/dbcommands.cpp src/mongo/db/repl/replication_coordinator_impl.cpp src/mongo/db/repl/replication_coordinator_impl.h src/mongo/db/repl/replication_coordinator_impl_test.cpp src/mongo/db/repl/replication_coordinator_mock.cpp src/mongo/db/repl/replication_coordinator_mock.h src/mongo/db/repl/replset_commands.cpp src/mongo/db/repl/topology_coordinator_impl.h
-rw-r--r--src/mongo/db/dbcommands.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp89
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp32
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h7
-rw-r--r--src/mongo/db/repl/replset_commands.cpp21
-rw-r--r--src/mongo/db/repl/topology_coordinator.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h6
11 files changed, 118 insertions, 80 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index 228c5ea150d..31c453ecaa3 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -92,6 +92,7 @@
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/rpc/metadata/sharding_metadata.h"
#include "mongo/rpc/protocol.h"
@@ -1466,8 +1467,9 @@ bool Command::run(OperationContext* txn,
if (isReplSet) {
repl::OpTime lastOpTimeFromClient =
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, &metadataBob);
-
+ if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
+ replCoord->prepareReplMetadata(lastOpTimeFromClient, &metadataBob);
+ }
// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18326
if (isShardingAware || serverGlobalParams.configsvr) {
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index dd55fcca799..18ccf45a224 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -685,9 +685,8 @@ public:
/**
* Prepares a metadata object describing the current term, primary, and lastOp information.
*/
- virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) = 0;
+ virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const = 0;
/**
* Returns true if the V1 election protocol is being used and false otherwise.
@@ -765,7 +764,7 @@ public:
/**
* Gets the latest OpTime of the currentCommittedSnapshot.
*/
- virtual OpTime getCurrentCommittedSnapshotOpTime() = 0;
+ virtual OpTime getCurrentCommittedSnapshotOpTime() const = 0;
/**
* Appends connection information to the provided BSONObjBuilder.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 5439c40f46b..698f810fe6e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -329,7 +329,7 @@ Date_t ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const {
return _priorityTakeoverWhen;
}
-OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() {
+OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_currentCommittedSnapshot) {
return _currentCommittedSnapshot->opTime;
@@ -1846,32 +1846,35 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const {
}
bool ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_rsConfig.isInitialized());
- // Do not send updates if we have been removed from the config.
- if (_selfIndex == -1) {
- return false;
- }
- cmdBuilder->append("replSetUpdatePosition", 1);
- // Create an array containing objects each live member connected to us and for ourself.
- BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes"));
- for (SlaveInfoVector::iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end(); ++itr) {
- if (itr->lastAppliedOpTime.isNull()) {
- // Don't include info on members we haven't heard from yet.
- continue;
- }
- // Don't include members we think are down.
- if (!itr->self && itr->down) {
- continue;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_rsConfig.isInitialized());
+ // Do not send updates if we have been removed from the config.
+ if (_selfIndex == -1) {
+ return false;
}
+ cmdBuilder->append("replSetUpdatePosition", 1);
+ // Create an array containing objects each live member connected to us and for ourself.
+ BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes"));
+ for (SlaveInfoVector::iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end(); ++itr) {
+ if (itr->lastAppliedOpTime.isNull()) {
+ // Don't include info on members we haven't heard from yet.
+ continue;
+ }
+ // Don't include members we think are down.
+ if (!itr->self && itr->down) {
+ continue;
+ }
- BSONObjBuilder entry(arrayBuilder.subobjStart());
- itr->lastDurableOpTime.append(&entry, "durableOpTime");
- itr->lastAppliedOpTime.append(&entry, "appliedOpTime");
- entry.append("memberId", itr->memberId);
- entry.append("cfgver", _rsConfig.getConfigVersion());
+ BSONObjBuilder entry(arrayBuilder.subobjStart());
+ itr->lastDurableOpTime.append(&entry, "durableOpTime");
+ itr->lastAppliedOpTime.append(&entry, "appliedOpTime");
+ entry.append("memberId", itr->memberId);
+ entry.append("cfgver", _rsConfig.getConfigVersion());
+ }
}
-
+ // Add metadata to command. Old style parsing logic will reject the metadata.
+ prepareReplMetadata(OpTime(), cmdBuilder);
return true;
}
@@ -3267,37 +3270,35 @@ void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish(
*result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm);
}
-void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) {
- if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
- rpc::ReplSetMetadata metadata;
- CBHStatus cbh = _replExecutor.scheduleWork(
- stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish,
- this,
- stdx::placeholders::_1,
- lastOpTimeFromClient,
- &metadata));
+void ReplicationCoordinatorImpl::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const {
+ rpc::ReplSetMetadata metadata;
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
-
- fassert(28709, cbh.getStatus());
- _replExecutor.wait(cbh.getValue());
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish,
+ this,
+ stdx::placeholders::_1,
+ lastOpTimeFromClient,
+ &metadata));
- metadata.writeToMetadata(builder);
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return;
}
+
+ fassert(28709, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+
+ metadata.writeToMetadata(builder);
}
void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish(
const ReplicationExecutor::CallbackArgs& cbData,
const OpTime& lastOpTimeFromClient,
- rpc::ReplSetMetadata* metadata) {
+ rpc::ReplSetMetadata* metadata) const {
OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
- _topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, _lastCommittedOpTime);
+ _topCoord->prepareReplMetadata(metadata, lastVisibleOpTime, _lastCommittedOpTime);
}
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 33b93a70027..3c6bbaefdbb 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -290,9 +290,8 @@ public:
virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
long long* responseTerm) override;
- void prepareReplResponseMetadata(const rpc::RequestInterface&,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) override;
+ virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) override;
@@ -317,7 +316,7 @@ public:
virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override;
- virtual OpTime getCurrentCommittedSnapshotOpTime() override;
+ virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
virtual void waitUntilSnapshotCommitted(OperationContext* txn,
const SnapshotName& untilSnapshot) override;
@@ -663,11 +662,11 @@ private:
Status* result);
/**
- * Bottom half of prepareReplResponseMetadata.
+ * Bottom half of prepareReplMetadata.
*/
void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData,
const OpTime& lastOpTimeFromClient,
- rpc::ReplSetMetadata* metadata);
+ rpc::ReplSetMetadata* metadata) const;
/**
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
* need to change as a result of time passing - for instance becoming PRIMARY when a single
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 2c791030e76..91fde3615b0 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -2038,7 +2038,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInUpdatePositionCommand) {
getReplCoord()->prepareReplSetUpdatePositionCommand(&cmdBuilder);
BSONObj cmd = cmdBuilder.done();
- ASSERT_EQUALS(2, cmd.nFields());
+ ASSERT_EQUALS(3, cmd.nFields());
ASSERT_EQUALS("replSetUpdatePosition", cmd.firstElement().fieldNameStringData());
std::set<long long> memberIds;
@@ -4221,6 +4221,36 @@ TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelieve
ASSERT_EQUALS(1U, memberIds4.size());
}
+TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) {
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234") << BSON("_id" << 2 << "host"
+ << "test3:1234"))
+ << "protocolVersion" << 1 << "settings"
+ << BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)),
+ HostAndPort("test1", 1234));
+ OpTime optime(Timestamp(100, 2), 0);
+ getReplCoord()->setMyLastAppliedOpTime(optime);
+ getReplCoord()->setMyLastDurableOpTime(optime);
+
+ // Set last committed optime via metadata.
+ rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1);
+ getReplCoord()->processReplSetMetadata(syncSourceMetadata);
+ getReplCoord()->onSnapshotCreate(optime, SnapshotName(1));
+
+ BSONObjBuilder cmdBuilder;
+ ASSERT_TRUE(getReplCoord()->prepareReplSetUpdatePositionCommand(&cmdBuilder));
+ BSONObj cmd = cmdBuilder.obj();
+ auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd));
+ ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm());
+ ASSERT_EQUALS(metadata.getLastOpVisible(), optime);
+}
+
TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNodesDown) {
assertStartSuccess(
BSON("_id"
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 7d3f987878f..cb763bed4ff 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -381,9 +381,8 @@ Status ReplicationCoordinatorMock::processReplSetDeclareElectionWinner(
return Status::OK();
}
-void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) {}
+void ReplicationCoordinatorMock::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const {}
Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) {
@@ -418,7 +417,7 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho
void ReplicationCoordinatorMock::dropAllSnapshots() {}
-OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() {
+OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const {
return OpTime();
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 15315cd5a8f..861799f00d2 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -217,9 +217,8 @@ public:
virtual Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
long long* responseTerm);
- void prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) override;
+ void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response);
@@ -242,7 +241,7 @@ public:
virtual void dropAllSnapshots() override;
- virtual OpTime getCurrentCommittedSnapshotOpTime() override;
+ virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
virtual void waitUntilSnapshotCommitted(OperationContext* txn,
const SnapshotName& untilSnapshot) override;
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index ee0aa50a6f0..e6d90aa82ec 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -664,7 +665,9 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result);
+ auto replCoord = repl::ReplicationCoordinator::get(txn->getClient()->getServiceContext());
+
+ Status status = replCoord->checkReplEnabledForCommand(&result);
if (!status.isOK())
return appendCommandStatus(result, status);
@@ -673,6 +676,14 @@ public:
if (cmdObj.hasField("handshake"))
return true;
+ auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj);
+ if (metadataResult.isOK()) {
+ // New style update position command has metadata, which may inform the
+ // upstream of a higher term.
+ auto metadata = metadataResult.getValue();
+ replCoord->processReplSetMetadata(metadata);
+ }
+
// In the case of an update from a member with an invalid replica set config,
// we return our current config version.
long long configVersion = -1;
@@ -681,9 +692,8 @@ public:
status = args.initialize(cmdObj);
if (status.isOK()) {
- // v3.2.2+ style replSetUpdatePosition command.
- status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition(
- args, &configVersion);
+ // v3.2.4+ style replSetUpdatePosition command.
+ status = replCoord->processReplSetUpdatePosition(args, &configVersion);
if (status == ErrorCodes::InvalidReplicaSetConfig) {
result.append("configVersion", configVersion);
@@ -696,8 +706,7 @@ public:
if (!status.isOK())
return appendCommandStatus(result, status);
- status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition(
- oldArgs, &configVersion);
+ status = replCoord->processReplSetUpdatePosition(oldArgs, &configVersion);
if (status == ErrorCodes::InvalidReplicaSetConfig) {
result.append("configVersion", configVersion);
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 5e8cda75505..3a51485052a 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -422,9 +422,9 @@ public:
/**
* Prepares a BSONObj describing the current term, primary, and lastOp information.
*/
- virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommittedOpTime) const = 0;
+ virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata,
+ const OpTime& lastVisibleOpTime,
+ const OpTime& lastCommittedOpTime) const = 0;
/**
* Writes into 'output' all the information needed to generate a summary of the current
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index afd720d7869..b235cbf5482 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2419,9 +2419,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return false;
}
-void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommittedOpTime) const {
+void TopologyCoordinatorImpl::prepareReplMetadata(rpc::ReplSetMetadata* metadata,
+ const OpTime& lastVisibleOpTime,
+ const OpTime& lastCommittedOpTime) const {
*metadata =
rpc::ReplSetMetadata(_term,
lastCommittedOpTime,
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 21b9799fca2..3645ebe1aaf 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -225,9 +225,9 @@ public:
const OpTime& lastOpCommitted);
virtual bool stepDownIfPending();
virtual Date_t getStepDownTime() const;
- virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommitttedOpTime) const;
+ virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata,
+ const OpTime& lastVisibleOpTime,
+ const OpTime& lastCommitttedOpTime) const;
Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
long long* responseTerm);
virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,