summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-02-21 08:22:57 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-22 04:36:49 +0000
commit6377ae0fd9c4632d7cefc44ba80b173e958295d2 (patch)
treeb7f951761f8f7fc421f68daa3c7368e92ca37a63
parent864fbf14ed92ce263c4481b960285fce6f407fb6 (diff)
downloadmongo-6377ae0fd9c4632d7cefc44ba80b173e958295d2.tar.gz
SERVER-46257: OplogFetcher should run LogicalTimeMetadataHook on DBClientConnection
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp15
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h4
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp30
4 files changed, 46 insertions, 4 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c20f9d37deb..4a05bea3c75 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -867,6 +867,7 @@ env.Library(
'replica_set_messages',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/clientdriver_network',
+ '$BUILD_DIR/mongo/db/logical_time_metadata_hook',
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/stats/timer_stats',
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 6ed2ac7f1a7..614e5efb0df 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -29,8 +29,6 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-#include "mongo/platform/basic.h"
-
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/base/counter.h"
@@ -635,17 +633,26 @@ Status OplogFetcher::_connect() {
void OplogFetcher::_setMetadataWriterAndReader() {
invariant(_conn);
+ _logicalTimeMetadataHook =
+ std::make_unique<rpc::LogicalTimeMetadataHook>(getGlobalServiceContext());
+
_conn->setRequestMetadataWriter([this](OperationContext* opCtx, BSONObjBuilder* metadataBob) {
*metadataBob << rpc::kReplSetMetadataFieldName << 1;
*metadataBob << rpc::kOplogQueryMetadataFieldName << 1;
metadataBob->appendElements(ReadPreferenceSetting::secondaryPreferredMetadata());
- return Status::OK();
+
+ // Run LogicalTimeMetadataHook on request metadata so this matches the behavior of the
+ // connections in the replication coordinator thread pool.
+ return _logicalTimeMetadataHook->writeRequestMetadata(opCtx, metadataBob);
});
_conn->setReplyMetadataReader(
[this](OperationContext* opCtx, const BSONObj& metadataObj, StringData source) {
_metadataObj = metadataObj.getOwned();
- return Status::OK();
+
+ // Run LogicalTimeMetadataHook on reply metadata so this matches the behavior of the
+ // connections in the replication coordinator thread pool.
+ return _logicalTimeMetadataHook->readReplyMetadata(opCtx, source, _metadataObj);
});
}
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index c24b4309227..ad92d104aa3 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -37,6 +37,7 @@
#include "mongo/client/dbclient_connection.h"
#include "mongo/client/dbclient_cursor.h"
#include "mongo/client/fetcher.h"
+#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/abstract_async_component.h"
#include "mongo/db/repl/data_replicator_external_state.h"
@@ -371,6 +372,9 @@ private:
// Used to keep track of the last oplog entry read and processed from the sync source.
OpTime _lastFetched;
+ // Logical time metadata handling hook for the DBClientConnection.
+ std::unique_ptr<rpc::LogicalTimeMetadataHook> _logicalTimeMetadataHook;
+
// Set by the ReplyMetadataReader upon receiving a new batch.
BSONObj _metadataObj;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 144de2076bd..ae50946d213 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -29,6 +29,7 @@
#include <memory>
+#include "mongo/db/logical_clock.h"
#include "mongo/db/repl/data_replicator_external_state_mock.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
@@ -37,6 +38,7 @@
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata.h"
+#include "mongo/rpc/metadata/logical_time_metadata.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/unittest/death_test.h"
@@ -358,6 +360,11 @@ void OplogFetcherTest::setUp() {
// Always enable oplogFetcherUsesExhaust at the beginning of each unittest in case some
// unittests disable it in the test.
oplogFetcherUsesExhaust = true;
+
+ // Set up a logical clock.
+ auto service = getGlobalServiceContext();
+ auto logicalClock = std::make_unique<LogicalClock>(service);
+ LogicalClock::set(service, std::move(logicalClock));
}
std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher() {
@@ -2114,4 +2121,27 @@ TEST_F(OplogFetcherTest, GetMoreEmptyBatch) {
ASSERT_OK(shutdownState.getStatus());
}
+
+TEST_F(OplogFetcherTest, HandleLogicalTimeMetaDataAndAdvanceClusterTime) {
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+
+ auto oldClusterTime = LogicalClock::get(getGlobalServiceContext())->getClusterTime();
+
+ auto logicalTime = LogicalTime(Timestamp(123456, 78));
+ auto logicalTimeMetadata =
+ rpc::LogicalTimeMetadata(SignedLogicalTime(logicalTime, TimeProofService::TimeProof(), 0));
+
+ BSONObjBuilder bob;
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ logicalTimeMetadata.writeToMetadata(&bob);
+ auto metadataObj = bob.obj();
+
+ // Process one batch with the logical time metadata.
+ ASSERT_OK(processSingleBatch(makeFirstBatch(0LL, {firstEntry}, metadataObj))->getStatus());
+
+ // Test that the cluster time is updated to the cluster time in the metadata.
+ auto currentClusterTime = LogicalClock::get(getGlobalServiceContext())->getClusterTime();
+ ASSERT_EQ(currentClusterTime, logicalTime);
+ ASSERT_NE(oldClusterTime, logicalTime);
+}
} // namespace