diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-01-20 10:08:50 -0500 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-04-06 09:54:34 -0400 |
commit | cbe53c003584cae8efad1715aeac181c8f8d2eb7 (patch) | |
tree | 3f60d340aad2149e3a925aabb9344edc636a3f32 /src | |
parent | b4e4f6c70b4324b8e5ffd8f292922c201ed7c92d (diff) | |
download | mongo-cbe53c003584cae8efad1715aeac181c8f8d2eb7.tar.gz |
SERVER-27543 send OplogQueryMetadata with OplogFetcher queries
(cherry picked from commit 859dfb093328ae9129f18952df4f25b123977a38)
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/dbcommands.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 20 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 8 |
12 files changed, 187 insertions, 34 deletions
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index f1c4037c32c..d577aa5dde2 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -95,6 +95,7 @@ #include "mongo/db/write_concern.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" @@ -1250,9 +1251,7 @@ void appendOpTimeMetadata(OperationContext* txn, // Attach our own last opTime. repl::OpTime lastOpTimeFromClient = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) { - replCoord->prepareReplMetadata(lastOpTimeFromClient, metadataBob); - } + replCoord->prepareReplMetadata(request.getMetadata(), lastOpTimeFromClient, metadataBob); // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18236 if (isShardingAware || isConfig) { diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index f9b0ed2a265..ccc8865cbf3 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -37,6 +37,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/stats/timer_stats.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -129,6 +130,8 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) { return isV1ElectionProtocol ? BSON(rpc::kReplSetMetadataFieldName << 1 + << rpc::kOplogQueryMetadataFieldName + << 1 << rpc::ServerSelectionMetadata::fieldName() << BSON(rpc::ServerSelectionMetadata::kSecondaryOkFieldName << true)) : rpc::ServerSelectionMetadata(true, boost::none).toBSON(); diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index b8c8f60440e..5c6db18013e 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/memory.h" @@ -399,8 +400,9 @@ TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocol enqueueDocumentsFn, [](Status, OpTimeWithHash) {}) .getMetadataObject_forTest(); - ASSERT_EQUALS(2, metadataObj.nFields()); + ASSERT_EQUALS(3, metadataObj.nFields()); ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt()); + ASSERT_EQUALS(1, metadataObj[rpc::kOplogQueryMetadataFieldName].numberInt()); } TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) { diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 160fa12b2ea..54f3d4480aa 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -57,9 +57,9 @@ struct ConnectionPoolStats; namespace rpc { +class OplogQueryMetadata; class ReplSetMetadata; class RequestInterface; -class ReplSetMetadata; } // namespace rpc @@ -739,9 +739,11 @@ public: ReplSetRequestVotesResponse* response) = 0; /** - * Prepares a metadata object describing the current term, primary, and lastOp information. + * Prepares a metadata object with the ReplSetMetadata and the OplogQueryMetadata depending + * on what has been requested. */ - virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + virtual void prepareReplMetadata(const BSONObj& metadataRequestObj, + const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const = 0; /** diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 15bad1ec64a..bee75d062af 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -73,6 +73,7 @@ #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/connection_pool_stats.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/request_interface.h" @@ -421,6 +422,10 @@ boost::optional<Date_t> ReplicationCoordinatorImpl::getPriorityTakeover_forTest( OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() const { stdx::lock_guard<stdx::mutex> lk(_mutex); + return _getCurrentCommittedSnapshotOpTime_inlock(); +} + +OpTime ReplicationCoordinatorImpl::_getCurrentCommittedSnapshotOpTime_inlock() const { if (_currentCommittedSnapshot) { return _currentCommittedSnapshot->opTime; } @@ -2040,7 +2045,9 @@ StatusWith<BSONObj> ReplicationCoordinatorImpl::prepareReplSetUpdatePositionComm // Add metadata to command. Old style parsing logic will reject the metadata. if (commandStyle == ReplSetUpdatePositionCommandStyle::kNewStyle) { - prepareReplMetadata(OpTime(), &cmdBuilder); + stdx::lock_guard<stdx::mutex> topoLock(_topoMutex); + stdx::lock_guard<stdx::mutex> lock(_mutex); + _prepareReplSetMetadata_inlock(OpTime(), &cmdBuilder); } return cmdBuilder.obj(); } @@ -3268,14 +3275,41 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes( return Status::OK(); } -void ReplicationCoordinatorImpl::prepareReplMetadata(const OpTime& lastOpTimeFromClient, +void ReplicationCoordinatorImpl::prepareReplMetadata(const BSONObj& metadataRequestObj, + const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const { - rpc::ReplSetMetadata metadata; + + bool hasReplSetMetadata = metadataRequestObj.hasField(rpc::kReplSetMetadataFieldName); + bool hasOplogQueryMetadata = metadataRequestObj.hasField(rpc::kOplogQueryMetadataFieldName); + // Don't take any locks if we do not need to. + if (!hasReplSetMetadata && !hasOplogQueryMetadata) { + return; + } + LockGuard topoLock(_topoMutex); + LockGuard lock(_mutex); + + if (hasReplSetMetadata) { + _prepareReplSetMetadata_inlock(lastOpTimeFromClient, builder); + } + + if (hasOplogQueryMetadata) { + _prepareOplogQueryMetadata_inlock(builder); + } +} + +void ReplicationCoordinatorImpl::_prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const { + OpTime lastVisibleOpTime = + std::max(lastOpTimeFromClient, _getCurrentCommittedSnapshotOpTime_inlock()); + auto metadata = _topCoord->prepareReplSetMetadata(lastVisibleOpTime, _lastCommittedOpTime); + metadata.writeToMetadata(builder); +} - OpTime lastReadableOpTime = getCurrentCommittedSnapshotOpTime(); - OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime); - _topCoord->prepareReplMetadata(&metadata, lastVisibleOpTime, _lastCommittedOpTime); +void ReplicationCoordinatorImpl::_prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const { + OpTime lastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); + auto metadata = + _topCoord->prepareOplogQueryMetadata(_lastCommittedOpTime, lastAppliedOpTime, _rbid); metadata.writeToMetadata(builder); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 37b81585366..789279b5252 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -65,6 +65,7 @@ struct ConnectionPoolStats; } // namespace executor namespace rpc { +class OplogQueryMetadata; class ReplSetMetadata; } // namespace rpc @@ -276,7 +277,8 @@ public: const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response) override; - virtual void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + virtual void prepareReplMetadata(const BSONObj& metadataRequestObj, + const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, @@ -576,6 +578,11 @@ private: bool getWriteConcernMajorityShouldJournal_inlock() const; /** + * Returns the OpTime of the current committed snapshot, if one exists. + */ + OpTime _getCurrentCommittedSnapshotOpTime_inlock() const; + + /** * Helper method that removes entries from _slaveInfo if they correspond to a node * with a member ID that is not in the current replica set config. Will always leave an * entry for ourself at the beginning of _slaveInfo, even if we aren't present in the @@ -1006,6 +1013,17 @@ private: bool advanceCommitPoint); /** + * Prepares a metadata object for ReplSetMetadata. + */ + void _prepareReplSetMetadata_inlock(const OpTime& lastOpTimeFromClient, + BSONObjBuilder* builder) const; + + /** + * Prepares a metadata object for OplogQueryMetadata. + */ + void _prepareOplogQueryMetadata_inlock(BSONObjBuilder* builder) const; + + /** * Blesses a snapshot to be used for new committed reads. */ void _updateCommittedSnapshot_inlock(SnapshotInfo newCommittedSnapshot); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index fa0185522ce..85a0bfe085b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -59,6 +59,7 @@ #include "mongo/db/service_context_noop.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/future.h" @@ -4069,6 +4070,80 @@ TEST_F(ReplCoordTest, ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); } +TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" + << 0) + << BSON("host" + << "node2:12345" + << "_id" + << 1) + << BSON("host" + << "node3:12345" + << "_id" + << 2)) + << "protocolVersion" + << 1), + HostAndPort("node1", 12345)); + getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); + + // Update committed optime with ReplSetMetadata. + OpTime optime1{Timestamp(10, 0), 3}; + OpTime optime2{Timestamp(11, 2), 5}; + + StatusWith<rpc::ReplSetMetadata> metadataToProcess = rpc::ReplSetMetadata::readFromMetadata( + BSON(rpc::kReplSetMetadataFieldName + << BSON("lastOpCommitted" << optime1.toBSON() << "lastOpVisible" << optime1.toBSON() + << "configVersion" + << 2 + << "primaryIndex" + << 2 + << "term" + << 3 + << "syncSourceIndex" + << 1))); + getReplCoord()->processReplSetMetadata(metadataToProcess.getValue(), true); + + getReplCoord()->setMyLastAppliedOpTime(optime2); + + // Get current rbid to check against. + BSONObjBuilder result; + getReplCoord()->processReplSetGetRBID(&result); + int initialValue = result.obj()["rbid"].Int(); + + BSONObjBuilder metadataBob; + getReplCoord()->prepareReplMetadata( + BSON(rpc::kOplogQueryMetadataFieldName << 1 << rpc::kReplSetMetadataFieldName << 1), + OpTime(), + &metadataBob); + + BSONObj metadata = metadataBob.done(); + log() << metadata; + + auto oqMetadata = rpc::OplogQueryMetadata::readFromMetadata(metadata); + ASSERT_OK(oqMetadata.getStatus()); + ASSERT_EQ(oqMetadata.getValue().getLastOpCommitted(), optime1); + ASSERT_EQ(oqMetadata.getValue().getLastOpApplied(), optime2); + ASSERT_EQ(oqMetadata.getValue().getRBID(), initialValue); + ASSERT_EQ(oqMetadata.getValue().getSyncSourceIndex(), -1); + ASSERT_EQ(oqMetadata.getValue().getPrimaryIndex(), -1); + + auto replMetadata = rpc::ReplSetMetadata::readFromMetadata(metadata); + ASSERT_OK(replMetadata.getStatus()); + ASSERT_EQ(replMetadata.getValue().getLastOpCommitted(), optime1); + ASSERT_EQ(replMetadata.getValue().getLastOpVisible(), OpTime()); + ASSERT_EQ(replMetadata.getValue().getConfigVersion(), 2); + ASSERT_EQ(replMetadata.getValue().getTerm(), 3); + ASSERT_EQ(replMetadata.getValue().getSyncSourceIndex(), -1); + ASSERT_EQ(replMetadata.getValue().getPrimaryIndex(), -1); +} + TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) { // Ensure that the metadata is processed if it is contained in a heartbeat response. assertStartSuccess(BSON("_id" diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 077fab7ddec..11ffe4cfde7 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -393,7 +393,8 @@ Status ReplicationCoordinatorMock::processReplSetRequestVotes( return Status::OK(); } -void ReplicationCoordinatorMock::prepareReplMetadata(const OpTime& lastOpTimeFromClient, +void ReplicationCoordinatorMock::prepareReplMetadata(const BSONObj& metadataRequestObj, + const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const {} Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index a5b75441b88..c57cd450454 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -219,7 +219,8 @@ public: const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response); - void prepareReplMetadata(const OpTime& lastOpTimeFromClient, + void prepareReplMetadata(const BSONObj& metadataRequestObj, + const OpTime& lastOpTimeFromClient, BSONObjBuilder* builder) const override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 26462a99c68..7dcf76164ec 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -435,11 +435,19 @@ public: virtual void setMyHeartbeatMessage(const Date_t now, const std::string& s) = 0; /** - * Prepares a BSONObj describing the current term, primary, and lastOp information. + * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp + * information. */ - virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommittedOpTime) const = 0; + virtual rpc::ReplSetMetadata prepareReplSetMetadata( + const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const = 0; + + /** + * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary, + * lastOpApplied, and lastOpCommitted. + */ + virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(const OpTime& lastCommittedOpTime, + const OpTime& lastAppliedOpTime, + int rbid) const = 0; /** * Writes into 'output' all the information needed to generate a summary of the current diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 44ffd2124c5..f4e9c01cea7 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -48,6 +48,7 @@ #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/server_parameters.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/hex.h" @@ -2528,17 +2529,24 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS return false; } -void TopologyCoordinatorImpl::prepareReplMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommittedOpTime) const { - *metadata = - rpc::ReplSetMetadata(_term, - lastCommittedOpTime, - lastVisibleOpTime, - _rsConfig.getConfigVersion(), - _rsConfig.getReplicaSetId(), - _currentPrimaryIndex, - _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); +rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata( + const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const { + return rpc::ReplSetMetadata(_term, + lastCommittedOpTime, + lastVisibleOpTime, + _rsConfig.getConfigVersion(), + _rsConfig.getReplicaSetId(), + _currentPrimaryIndex, + _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); +} + +rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata( + const OpTime& lastCommittedOpTime, const OpTime& lastAppliedOpTime, int rbid) const { + return rpc::OplogQueryMetadata(lastCommittedOpTime, + lastAppliedOpTime, + rbid, + _currentPrimaryIndex, + _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); } void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 569dcb7b6a2..f57d945465e 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -223,9 +223,11 @@ public: const OpTime& lastOpCommitted); virtual bool stepDownIfPending(); virtual Date_t getStepDownTime() const; - virtual void prepareReplMetadata(rpc::ReplSetMetadata* metadata, - const OpTime& lastVisibleOpTime, - const OpTime& lastCommitttedOpTime) const; + virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime, + const OpTime& lastCommitttedOpTime) const; + virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(const OpTime& lastCommittedOpTime, + const OpTime& lastAppliedOpTime, + int rbid) const; virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, ReplSetRequestVotesResponse* response, const OpTime& lastAppliedOpTime); |