summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2016-05-21 01:06:12 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2016-06-01 16:39:42 -0400
commit551fb26a8ad83c64ea018594c06fbda7002e996c (patch)
tree362abcc051bb10586bd299b2004b744c4fb141b8 /src
parent8d41cd981f778d3f9d4e3add11ee26bd81f4fb11 (diff)
downloadmongo-551fb26a8ad83c64ea018594c06fbda7002e996c.tar.gz
SERVER-22136 Attach term metadata to UpdatePosition command
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/dbcommands.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp103
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp36
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h7
-rw-r--r--src/mongo/db/repl/replset_commands.cpp18
-rw-r--r--src/mongo/db/repl/topology_coordinator.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h6
12 files changed, 128 insertions, 85 deletions
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index a7def1cb6c5..4b5df659bd7 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -92,6 +92,7 @@
#include "mongo/db/write_concern.h"
#include "mongo/rpc/metadata.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/rpc/metadata/sharding_metadata.h"
#include "mongo/rpc/protocol.h"
@@ -1237,8 +1238,9 @@ void appendOpTimeMetadata(OperationContext* txn,
// Attach our own last opTime.
repl::OpTime lastOpTimeFromClient =
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- replCoord->prepareReplResponseMetadata(request, lastOpTimeFromClient, metadataBob);
-
+ if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
+ replCoord->prepareReplMetadata(lastOpTimeFromClient, metadataBob);
+ }
// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18236
if (isShardingAware || isConfig) {
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 0f226a6b13b..3e915211977 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -679,9 +679,8 @@ public:
/**
* Prepares a metadata object describing the current term, primary, and lastOp information.
*/
- virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) = 0;
+ virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const = 0;
/**
* Returns true if the V1 election protocol is being used and false otherwise.
@@ -759,7 +758,7 @@ public:
/**
* Gets the latest OpTime of the currentCommittedSnapshot.
*/
- virtual OpTime getCurrentCommittedSnapshotOpTime() = 0;
+ virtual OpTime getCurrentCommittedSnapshotOpTime() const = 0;
/**
* Appends connection information to the provided BSONObjBuilder.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index b6c89737b5f..d3d80f5ce65 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -338,7 +338,7 @@ Date_t ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const {
return _priorityTakeoverWhen;
}
-OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() {
+OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_currentCommittedSnapshot) {
return _currentCommittedSnapshot->opTime;
@@ -1889,49 +1889,55 @@ int ReplicationCoordinatorImpl::_getMyId_inlock() const {
StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_rsConfig.isInitialized());
- // Do not send updates if we have been removed from the config.
- if (_selfIndex == -1) {
- return Status(ErrorCodes::NodeNotFound,
- "This node is not in the current replset configuration.");
- }
BSONObjBuilder cmdBuilder;
- cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
- // Create an array containing objects each live member connected to us and for ourself.
- BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
- for (const auto& slaveInfo : _slaveInfo) {
- if (slaveInfo.lastAppliedOpTime.isNull()) {
- // Don't include info on members we haven't heard from yet.
- continue;
- }
- // Don't include members we think are down.
- if (!slaveInfo.self && slaveInfo.down) {
- continue;
- }
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_rsConfig.isInitialized());
+ // Do not send updates if we have been removed from the config.
+ if (_selfIndex == -1) {
+ return Status(ErrorCodes::NodeNotFound,
+ "This node is not in the current replset configuration.");
+ }
+ cmdBuilder.append(UpdatePositionArgs::kCommandFieldName, 1);
+ // Create an array containing objects each live member connected to us and for ourself.
+ BSONArrayBuilder arrayBuilder(cmdBuilder.subarrayStart("optimes"));
+ for (const auto& slaveInfo : _slaveInfo) {
+ if (slaveInfo.lastAppliedOpTime.isNull()) {
+ // Don't include info on members we haven't heard from yet.
+ continue;
+ }
+ // Don't include members we think are down.
+ if (!slaveInfo.self && slaveInfo.down) {
+ continue;
+ }
- BSONObjBuilder entry(arrayBuilder.subobjStart());
- switch (commandStyle) {
- case ReplSetUpdatePositionCommandStyle::kNewStyle:
- slaveInfo.lastDurableOpTime.append(&entry,
- UpdatePositionArgs::kDurableOpTimeFieldName);
- slaveInfo.lastAppliedOpTime.append(&entry,
- UpdatePositionArgs::kAppliedOpTimeFieldName);
- break;
- case ReplSetUpdatePositionCommandStyle::kOldStyle:
- entry.append("_id", slaveInfo.rid);
- if (isV1ElectionProtocol()) {
- slaveInfo.lastDurableOpTime.append(&entry, "optime");
- } else {
- entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp());
- }
- break;
+ BSONObjBuilder entry(arrayBuilder.subobjStart());
+ switch (commandStyle) {
+ case ReplSetUpdatePositionCommandStyle::kNewStyle:
+ slaveInfo.lastDurableOpTime.append(&entry,
+ UpdatePositionArgs::kDurableOpTimeFieldName);
+ slaveInfo.lastAppliedOpTime.append(&entry,
+ UpdatePositionArgs::kAppliedOpTimeFieldName);
+ break;
+ case ReplSetUpdatePositionCommandStyle::kOldStyle:
+ entry.append("_id", slaveInfo.rid);
+ if (isV1ElectionProtocol()) {
+ slaveInfo.lastDurableOpTime.append(&entry, "optime");
+ } else {
+ entry.append("optime", slaveInfo.lastDurableOpTime.getTimestamp());
+ }
+ break;
+ }
+ entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId);
+ entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
}
- entry.append(UpdatePositionArgs::kMemberIdFieldName, slaveInfo.memberId);
- entry.append(UpdatePositionArgs::kConfigVersionFieldName, _rsConfig.getConfigVersion());
+ arrayBuilder.done();
}
- arrayBuilder.done();
+ // Add metadata to command. Old style parsing logic will reject the metadata.
+ if (commandStyle == ReplSetUpdatePositionCommandStyle::kNewStyle) {
+ prepareReplMetadata(OpTime(), &cmdBuilder);
+ }
return cmdBuilder.obj();
}
@@ -3121,18 +3127,15 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
return Status::OK();
}
-void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) {
- if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
- rpc::ReplSetMetadata metadata;
- LockGuard topoLock(_topoMutex);
+void ReplicationCoordinatorImpl::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const {
+ rpc::ReplSetMetadata metadata;
+ LockGuard topoLock(_topoMutex);
- OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
- OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
- _topCoord->prepareReplResponseMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime);
- metadata.writeToMetadata(builder);
- }
+ OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
+ OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
+ _topCoord->prepareReplMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime);
+ metadata.writeToMetadata(builder);
}
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() const {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index bfa93ac3459..41c11a156d7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -286,9 +286,8 @@ public:
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response) override;
- void prepareReplResponseMetadata(const rpc::RequestInterface&,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) override;
+ void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) override;
@@ -313,7 +312,7 @@ public:
virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override;
- virtual OpTime getCurrentCommittedSnapshotOpTime() override;
+ virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
virtual void waitUntilSnapshotCommitted(OperationContext* txn,
const SnapshotName& untilSnapshot) override;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 4c8945ac064..11ea4cf40ba 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -155,8 +155,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
replMetadata = responseStatus;
}
if (replMetadata.isOK()) {
- // Asynchronous stepdown could happen, but it will be queued in executor after
- // this function, so we cannot and don't need to wait for it to finish.
+ // Asynchronous stepdown could happen, but it will wait for _topoMutex and execute
+ // after this function, so we cannot and don't need to wait for it to finish.
_processReplSetMetadata_incallback(replMetadata.getValue());
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 0343865aa6c..c40542ebc5e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -1911,7 +1911,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInUpdatePositionCommand) {
BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand(
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle));
- ASSERT_EQUALS(2, cmd.nFields());
+ ASSERT_EQUALS(3, cmd.nFields());
ASSERT_EQUALS(UpdatePositionArgs::kCommandFieldName, cmd.firstElement().fieldNameStringData());
std::set<long long> memberIds;
@@ -4483,6 +4483,40 @@ TEST_F(ReplCoordTest, OnlyForwardSyncProgressForOtherNodesWhenTheNodesAreBelieve
ASSERT_EQUALS(1U, memberIds4.size());
}
+TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) {
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234")
+ << BSON("_id" << 2 << "host"
+ << "test3:1234"))
+ << "protocolVersion"
+ << 1
+ << "settings"
+ << BSON("electionTimeoutMillis" << 2000 << "heartbeatIntervalMillis" << 40000)),
+ HostAndPort("test1", 1234));
+ OpTime optime(Timestamp(100, 2), 0);
+ getReplCoord()->setMyLastAppliedOpTime(optime);
+ getReplCoord()->setMyLastDurableOpTime(optime);
+
+ // Set last committed optime via metadata.
+ rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1);
+ getReplCoord()->processReplSetMetadata(syncSourceMetadata);
+ getReplCoord()->onSnapshotCreate(optime, SnapshotName(1));
+
+ BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand(
+ ReplicationCoordinator::ReplSetUpdatePositionCommandStyle::kNewStyle));
+ auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd));
+ ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm());
+ ASSERT_EQUALS(metadata.getLastOpVisible(), optime);
+}
+
TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNodesDown) {
assertStartSuccess(
BSON("_id"
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 75efdf63845..ca59fd1a833 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -377,9 +377,8 @@ Status ReplicationCoordinatorMock::processReplSetRequestVotes(
return Status::OK();
}
-void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) {}
+void ReplicationCoordinatorMock::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const {}
Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response) {
@@ -414,7 +413,7 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho
void ReplicationCoordinatorMock::dropAllSnapshots() {}
-OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() {
+OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const {
return OpTime();
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 3ace717f8c2..d2343d883ea 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -216,9 +216,8 @@ public:
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response);
- void prepareReplResponseMetadata(const rpc::RequestInterface& request,
- const OpTime& lastOpTimeFromClient,
- BSONObjBuilder* builder) override;
+ void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
ReplSetHeartbeatResponse* response);
@@ -241,7 +240,7 @@ public:
virtual void dropAllSnapshots() override;
- virtual OpTime getCurrentCommittedSnapshotOpTime() override;
+ virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
virtual void waitUntilSnapshotCommitted(OperationContext* txn,
const SnapshotName& untilSnapshot) override;
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index c91e28b71a6..0b26a4360ac 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -658,7 +658,9 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- Status status = getGlobalReplicationCoordinator()->checkReplEnabledForCommand(&result);
+ auto replCoord = repl::ReplicationCoordinator::get(txn->getClient()->getServiceContext());
+
+ Status status = replCoord->checkReplEnabledForCommand(&result);
if (!status.isOK())
return appendCommandStatus(result, status);
@@ -667,6 +669,14 @@ public:
if (cmdObj.hasField("handshake"))
return true;
+ auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(cmdObj);
+ if (metadataResult.isOK()) {
+ // New style update position command has metadata, which may inform the
+ // upstream of a higher term.
+ auto metadata = metadataResult.getValue();
+ replCoord->processReplSetMetadata(metadata);
+ }
+
// In the case of an update from a member with an invalid replica set config,
// we return our current config version.
long long configVersion = -1;
@@ -676,8 +686,7 @@ public:
status = args.initialize(cmdObj);
if (status.isOK()) {
// v3.2.4+ style replSetUpdatePosition command.
- status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition(
- args, &configVersion);
+ status = replCoord->processReplSetUpdatePosition(args, &configVersion);
if (status == ErrorCodes::InvalidReplicaSetConfig) {
result.append("configVersion", configVersion);
@@ -690,8 +699,7 @@ public:
if (!status.isOK())
return appendCommandStatus(result, status);
- status = getGlobalReplicationCoordinator()->processReplSetUpdatePosition(
- oldArgs, &configVersion);
+ status = replCoord->processReplSetUpdatePosition(oldArgs, &configVersion);
if (status == ErrorCodes::InvalidReplicaSetConfig) {
result.append("configVersion", configVersion);
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 1548cb774a9..72c21024684 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -414,9 +414,9 @@ public:
/**
* Prepares a BSONObj describing the current term, primary, and lastOp information.
*/
- virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommittedOpTime) const = 0;
+ virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata,
+ const OpTime& lastVisibleOpTime,
+ const OpTime& lastCommittedOpTime) const = 0;
/**
* Writes into 'output' all the information needed to generate a summary of the current
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index fb3c5aaec98..8d1ed0e9992 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2370,9 +2370,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return false;
}
-void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommittedOpTime) const {
+void TopologyCoordinatorImpl::prepareReplMetadata(rpc::ReplSetMetadata* metadata,
+ const OpTime& lastVisibleOpTime,
+ const OpTime& lastCommittedOpTime) const {
*metadata =
rpc::ReplSetMetadata(_term,
lastCommittedOpTime,
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index d3eb233acb8..fefebd6d9db 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -217,9 +217,9 @@ public:
virtual bool stepDown(Date_t until, bool force, const OpTime& lastOpApplied);
virtual bool stepDownIfPending();
virtual Date_t getStepDownTime() const;
- virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommitttedOpTime) const;
+ virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata,
+ const OpTime& lastVisibleOpTime,
+ const OpTime& lastCommitttedOpTime) const;
virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response,
const OpTime& lastAppliedOpTime);