summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-01-20 10:08:50 -0500
committerJudah Schvimer <judah@mongodb.com>2017-01-20 10:08:50 -0500
commit859dfb093328ae9129f18952df4f25b123977a38 (patch)
treee558ffd4bc0e315967684ecdd74fcc58108a7530 /src/mongo/db/repl
parent6daec9687bb98fd4d2e6f4627afd9ad85a11d66b (diff)
downloadmongo-859dfb093328ae9129f18952df4f25b123977a38.tar.gz
SERVER-27543 send OplogQueryMetadata with OplogFetcher queries
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp3
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator.h8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp46
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h20
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp75
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h3
-rw-r--r--src/mongo/db/repl/topology_coordinator.h16
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp30
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h8
11 files changed, 185 insertions, 31 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index f976b9ac18d..3bc3284bbee 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/stats/timer_stats.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
@@ -115,6 +116,8 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) {
return isV1ElectionProtocol
? BSON(rpc::kReplSetMetadataFieldName
<< 1
+ << rpc::kOplogQueryMetadataFieldName
+ << 1
<< rpc::ServerSelectionMetadata::fieldName()
<< BSON(rpc::ServerSelectionMetadata::kSecondaryOkFieldName << true))
: rpc::ServerSelectionMetadata(true, boost::none).toBSON();
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 1c77b47bd3e..5b753cce881 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/memory.h"
@@ -399,8 +400,9 @@ TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocol
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
.getMetadataObject_forTest();
- ASSERT_EQUALS(2, metadataObj.nFields());
+ ASSERT_EQUALS(3, metadataObj.nFields());
ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt());
+ ASSERT_EQUALS(1, metadataObj[rpc::kOplogQueryMetadataFieldName].numberInt());
}
TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) {
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index c33582a0aba..79deadb3bf9 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -57,9 +57,9 @@ struct ConnectionPoolStats;
namespace rpc {
+class OplogQueryMetadata;
class ReplSetMetadata;
class RequestInterface;
-class ReplSetMetadata;
} // namespace rpc
@@ -689,9 +689,11 @@ public:
ReplSetRequestVotesResponse* response) = 0;
/**
- * Prepares a metadata object describing the current term, primary, and lastOp information.
+ * Prepares a metadata object with the ReplSetMetadata and the OplogQueryMetadata depending
+ * on what has been requested.
*/
- virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ virtual void prepareReplMetadata(const BSONObj& metadataRequestObj,
+ const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const = 0;
/**
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a6e427ae879..66db01ccfe9 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -73,6 +73,7 @@
#include "mongo/db/write_concern.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/connection_pool_stats.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/rpc/request_interface.h"
@@ -385,6 +386,10 @@ Date_t ReplicationCoordinatorImpl::getPriorityTakeover_forTest() const {
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _getCurrentCommittedSnapshotOpTime_inlock();
+}
+
+OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime_inlock() const {
if (_currentCommittedSnapshot) {
return _currentCommittedSnapshot->opTime;
}
@@ -2015,7 +2020,9 @@ StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionComm
// Add metadata to command. Old style parsing logic will reject the metadata.
if (commandStyle == ReplSetUpdatePositionCommandStyle::kNewStyle) {
- prepareReplMetadata(OpTime(), &cmdBuilder);
+ stdx::lock_guard<stdx::mutex> topoLock(_topoMutex);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _prepareReplSetMetadata_inlock(OpTime(), &cmdBuilder);
}
return cmdBuilder.obj();
}
@@ -3250,14 +3257,41 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
return Status::OK();
}
-void ReplicationCoordinatorImpl::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj,
+ const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const {
- rpc::ReplSetMetadata metadata;
+
+ bool hasReplSetMetadata = metadataRequestObj.hasField(rpc::kReplSetMetadataFieldName);
+ bool hasOplogQueryMetadata = metadataRequestObj.hasField(rpc::kOplogQueryMetadataFieldName);
+ // Don't take any locks if we do not need to.
+ if (!hasReplSetMetadata && !hasOplogQueryMetadata) {
+ return;
+ }
+
LockGuard topoLock(_topoMutex);
+ LockGuard lock(_mutex);
+
+ if (hasReplSetMetadata) {
+ _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder);
+ }
+
+ if (hasOplogQueryMetadata) {
+ _prepareOplogQueryMetadata_inlock(builder);
+ }
+}
+
+void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const {
+ OpTime lastVisibleOpTime =
+ std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock());
+ auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime, _lastCommittedOpTime);
+ metadata.writeToMetadata(builder);
+}
- OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime();
- OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
- _topCoord->prepareReplMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime);
+void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const {
+ OpTime lastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
+ auto metadata =
+ _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, _rbid);
metadata.writeToMetadata(builder);
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 9b7eb084d2e..b8f19b16271 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -65,6 +65,7 @@ struct ConnectionPoolStats;
} // namespace executor
namespace rpc {
+class OplogQueryMetadata;
class ReplSetMetadata;
} // namespace rpc
@@ -270,7 +271,8 @@ public:
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response) override;
- virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ virtual void prepareReplMetadata(const BSONObj& metadataRequestObj,
+ const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
@@ -554,6 +556,11 @@ private:
bool getWriteConcernMajorityShouldJournal_inlock() const;
/**
+ * Returns the OpTime of the current committed snapshot, if one exists.
+ */
+ OpTime _getCurrentCommittedSnapshotOpTime_inlock() const;
+
+ /**
* Helper method that removes entries from _slaveInfo if they correspond to a node
* with a member ID that is not in the current replica set config. Will always leave an
* entry for ourself at the beginning of _slaveInfo, even if we aren't present in the
@@ -978,6 +985,17 @@ private:
bool advanceCommitPoint);
/**
+ * Prepares a metadata object for ReplSetMetadata.
+ */
+ void _prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient,
+ BSONObjBuilder* builder) const;
+
+ /**
+ * Prepares a metadata object for OplogQueryMetadata.
+ */
+ void _prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const;
+
+ /**
* Blesses a snapshot to be used for new committed reads.
*/
void _updateCommittedSnapshot_inlock(SnapshotInfo newCommittedSnapshot);
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 757a422bf70..ceaae23fbea 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/service_context_noop.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/network_interface_mock.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/future.h"
@@ -4071,6 +4072,80 @@ TEST_F(ReplCoordTest,
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
}
+TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version"
+ << 2
+ << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id"
+ << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id"
+ << 1)
+ << BSON("host"
+ << "node3:12345"
+ << "_id"
+ << 2))
+ << "protocolVersion"
+ << 1),
+ HostAndPort("node1", 12345));
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+ // Update committed optime with ReplSetMetadata.
+ OpTime optime1{Timestamp(10, 0), 3};
+ OpTime optime2{Timestamp(11, 2), 5};
+
+ StatusWith<rpc::ReplSetMetadata> metadataToProcess = rpc::ReplSetMetadata::readFromMetadata(
+ BSON(rpc::kReplSetMetadataFieldName
+ << BSON("lastOpCommitted" << optime1.toBSON() << "lastOpVisible" << optime1.toBSON()
+ << "configVersion"
+ << 2
+ << "primaryIndex"
+ << 2
+ << "term"
+ << 3
+ << "syncSourceIndex"
+ << 1)));
+ getReplCoord()->processReplSetMetadata(metadataToProcess.getValue(), true);
+
+ getReplCoord()->setMyLastAppliedOpTime(optime2);
+
+ // Get current rbid to check against.
+ BSONObjBuilder result;
+ getReplCoord()->processReplSetGetRBID(&result);
+ int initialValue = result.obj()["rbid"].Int();
+
+ BSONObjBuilder metadataBob;
+ getReplCoord()->prepareReplMetadata(
+ BSON(rpc::kOplogQueryMetadataFieldName << 1 << rpc::kReplSetMetadataFieldName << 1),
+ OpTime(),
+ &metadataBob);
+
+ BSONObj metadata = metadataBob.done();
+ log() << metadata;
+
+ auto oqMetadata = rpc::OplogQueryMetadata::readFromMetadata(metadata);
+ ASSERT_OK(oqMetadata.getStatus());
+ ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted(), optime1);
+ ASSERT_EQ(oqMetadata.getValue().getLastOpApplied(), optime2);
+ ASSERT_EQ(oqMetadata.getValue().getRBID(), initialValue);
+ ASSERT_EQ(oqMetadata.getValue().getSyncSourceIndex(), -1);
+ ASSERT_EQ(oqMetadata.getValue().getPrimaryIndex(), -1);
+
+ auto replMetadata = rpc::ReplSetMetadata::readFromMetadata(metadata);
+ ASSERT_OK(replMetadata.getStatus());
+ ASSERT_EQ(replMetadata.getValue().getLastOpCommitted(), optime1);
+ ASSERT_EQ(replMetadata.getValue().getLastOpVisible(), OpTime());
+ ASSERT_EQ(replMetadata.getValue().getConfigVersion(), 2);
+ ASSERT_EQ(replMetadata.getValue().getTerm(), 3);
+ ASSERT_EQ(replMetadata.getValue().getSyncSourceIndex(), -1);
+ ASSERT_EQ(replMetadata.getValue().getPrimaryIndex(), -1);
+}
+
TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) {
// Ensure that the metadata is processed if it is contained in a heartbeat response.
assertStartSuccess(BSON("_id"
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index ee2c819bc4d..9663600063c 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -397,7 +397,8 @@ Status ReplicationCoordinatorMock::processReplSetRequestVotes(
return Status::OK();
}
-void ReplicationCoordinatorMock::prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+void ReplicationCoordinatorMock::prepareReplMetadata(const BSONObj& metadataRequestObj,
+ const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const {}
Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 67d6a555274..d2b00cc206e 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -221,7 +221,8 @@ public:
const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response);
- void prepareReplMetadata(const OpTime& lastOpTimeFromClient,
+ void prepareReplMetadata(const BSONObj& metadataRequestObj,
+ const OpTime& lastOpTimeFromClient,
BSONObjBuilder* builder) const override;
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 26462a99c68..7dcf76164ec 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -435,11 +435,19 @@ public:
virtual void setMyHeartbeatMessage(const Date_t now, const std::string& s) = 0;
/**
- * Prepares a BSONObj describing the current term, primary, and lastOp information.
+ * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp
+ * information.
*/
- virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommittedOpTime) const = 0;
+ virtual rpc::ReplSetMetadata prepareReplSetMetadata(
+ const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const = 0;
+
+ /**
+ * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary,
+ * lastOpApplied, and lastOpCommitted.
+ */
+ virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(const OpTime& lastCommittedOpTime,
+ const OpTime& lastAppliedOpTime,
+ int rbid) const = 0;
/**
* Writes into 'output' all the information needed to generate a summary of the current
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index a96790c41a5..1f6d11a86bb 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/server_parameters.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/hex.h"
@@ -2511,17 +2512,24 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
return false;
}
-void TopologyCoordinatorImpl::prepareReplMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommittedOpTime) const {
- *metadata =
- rpc::ReplSetMetadata(_term,
- lastCommittedOpTime,
- lastVisibleOpTime,
- _rsConfig.getConfigVersion(),
- _rsConfig.getReplicaSetId(),
- _currentPrimaryIndex,
- _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
+rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata(
+ const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const {
+ return rpc::ReplSetMetadata(_term,
+ lastCommittedOpTime,
+ lastVisibleOpTime,
+ _rsConfig.getConfigVersion(),
+ _rsConfig.getReplicaSetId(),
+ _currentPrimaryIndex,
+ _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
+}
+
+rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(
+ const OpTime& lastCommittedOpTime, const OpTime& lastAppliedOpTime, int rbid) const {
+ return rpc::OplogQueryMetadata(lastCommittedOpTime,
+ lastAppliedOpTime,
+ rbid,
+ _currentPrimaryIndex,
+ _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
}
void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index ec2b02ebccd..57dabbf6ad9 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -223,9 +223,11 @@ public:
const OpTime& lastOpCommitted);
virtual bool stepDownIfPending();
virtual Date_t getStepDownTime() const;
- virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata,
- const OpTime& lastVisibleOpTime,
- const OpTime& lastCommitttedOpTime) const;
+ virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime,
+ const OpTime& lastCommitttedOpTime) const;
+ virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(const OpTime& lastCommittedOpTime,
+ const OpTime& lastAppliedOpTime,
+ int rbid) const;
virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
ReplSetRequestVotesResponse* response,
const OpTime& lastAppliedOpTime);