From 6377ae0fd9c4632d7cefc44ba80b173e958295d2 Mon Sep 17 00:00:00 2001 From: Lingzhi Deng Date: Fri, 21 Feb 2020 08:22:57 -0500 Subject: SERVER-46257: OplogFetcher should run LogicalTimeMetadataHook on DBClientConnection --- src/mongo/db/repl/SConscript | 1 + src/mongo/db/repl/oplog_fetcher.cpp | 15 +++++++++++---- src/mongo/db/repl/oplog_fetcher.h | 4 ++++ src/mongo/db/repl/oplog_fetcher_test.cpp | 30 ++++++++++++++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c20f9d37deb..4a05bea3c75 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -867,6 +867,7 @@ env.Library( 'replica_set_messages', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/client/clientdriver_network', + '$BUILD_DIR/mongo/db/logical_time_metadata_hook', '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/stats/timer_stats', diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 6ed2ac7f1a7..614e5efb0df 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -29,8 +29,6 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication -#include "mongo/platform/basic.h" - #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/base/counter.h" @@ -635,17 +633,26 @@ Status OplogFetcher::_connect() { void OplogFetcher::_setMetadataWriterAndReader() { invariant(_conn); + _logicalTimeMetadataHook = + std::make_unique(getGlobalServiceContext()); + _conn->setRequestMetadataWriter([this](OperationContext* opCtx, BSONObjBuilder* metadataBob) { *metadataBob << rpc::kReplSetMetadataFieldName << 1; *metadataBob << rpc::kOplogQueryMetadataFieldName << 1; metadataBob->appendElements(ReadPreferenceSetting::secondaryPreferredMetadata()); - return Status::OK(); + + // Run LogicalTimeMetadataHook on request metadata so this matches the behavior of the + // connections in the replication coordinator thread pool. + return _logicalTimeMetadataHook->writeRequestMetadata(opCtx, metadataBob); }); _conn->setReplyMetadataReader( [this](OperationContext* opCtx, const BSONObj& metadataObj, StringData source) { _metadataObj = metadataObj.getOwned(); - return Status::OK(); + + // Run LogicalTimeMetadataHook on reply metadata so this matches the behavior of the + // connections in the replication coordinator thread pool. + return _logicalTimeMetadataHook->readReplyMetadata(opCtx, source, _metadataObj); }); } diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index c24b4309227..ad92d104aa3 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -37,6 +37,7 @@ #include "mongo/client/dbclient_connection.h" #include "mongo/client/dbclient_cursor.h" #include "mongo/client/fetcher.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/abstract_async_component.h" #include "mongo/db/repl/data_replicator_external_state.h" @@ -371,6 +372,9 @@ private: // Used to keep track of the last oplog entry read and processed from the sync source. OpTime _lastFetched; + // Logical time metadata handling hook for the DBClientConnection. + std::unique_ptr _logicalTimeMetadataHook; + // Set by the ReplyMetadataReader upon receiving a new batch. BSONObj _metadataObj; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 144de2076bd..ae50946d213 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -29,6 +29,7 @@ #include +#include "mongo/db/logical_clock.h" #include "mongo/db/repl/data_replicator_external_state_mock.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/repl_server_parameters_gen.h" @@ -37,6 +38,7 @@ #include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata.h" +#include "mongo/rpc/metadata/logical_time_metadata.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/unittest/death_test.h" @@ -358,6 +360,11 @@ void OplogFetcherTest::setUp() { // Always enable oplogFetcherUsesExhaust at the beginning of each unittest in case some // unittests disable it in the test. oplogFetcherUsesExhaust = true; + + // Set up a logical clock. + auto service = getGlobalServiceContext(); + auto logicalClock = std::make_unique(service); + LogicalClock::set(service, std::move(logicalClock)); } std::unique_ptr OplogFetcherTest::makeOplogFetcher() { @@ -2114,4 +2121,27 @@ TEST_F(OplogFetcherTest, GetMoreEmptyBatch) { ASSERT_OK(shutdownState.getStatus()); } + +TEST_F(OplogFetcherTest, HandleLogicalTimeMetaDataAndAdvanceClusterTime) { + auto firstEntry = makeNoopOplogEntry(lastFetched); + + auto oldClusterTime = LogicalClock::get(getGlobalServiceContext())->getClusterTime(); + + auto logicalTime = LogicalTime(Timestamp(123456, 78)); + auto logicalTimeMetadata = + rpc::LogicalTimeMetadata(SignedLogicalTime(logicalTime, TimeProofService::TimeProof(), 0)); + + BSONObjBuilder bob; + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + logicalTimeMetadata.writeToMetadata(&bob); + auto metadataObj = bob.obj(); + + // Process one batch with the logical time metadata. + ASSERT_OK(processSingleBatch(makeFirstBatch(0LL, {firstEntry}, metadataObj))->getStatus()); + + // Test that the cluster time is updated to the cluster time in the metadata. + auto currentClusterTime = LogicalClock::get(getGlobalServiceContext())->getClusterTime(); + ASSERT_EQ(currentClusterTime, logicalTime); + ASSERT_NE(oldClusterTime, logicalTime); +} } // namespace -- cgit v1.2.1