summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-02-19 16:23:13 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-20 23:37:51 +0000
commit830fbc2939c577f0904ba1202c1eca9d8e921c54 (patch)
tree8a718a578d753f68203b0c8924a27bf104784344 /src/mongo/db
parente4571070e9659a1ff9335eb0382a6fa0d611bfcf (diff)
downloadmongo-830fbc2939c577f0904ba1202c1eca9d8e921c54.tar.gz
SERVER-46257: OplogFetcher should run LogicalTimeMetadataHook on DBClientConnection
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp15
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp17
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h12
8 files changed, 49 insertions, 18 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 8d40c7b3b9b..4890f4f245a 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -854,6 +854,7 @@ env.Library(
'replica_set_messages',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/clientdriver_network',
+ '$BUILD_DIR/mongo/db/logical_time_metadata_hook',
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/stats/timer_stats',
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index e02922be165..496875b97e5 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -29,8 +29,6 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-#include "mongo/platform/basic.h"
-
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/base/counter.h"
@@ -636,17 +634,26 @@ Status OplogFetcher::_connect() {
void OplogFetcher::_setMetadataWriterAndReader() {
invariant(_conn);
+ _logicalTimeMetadataHook =
+ std::make_unique<rpc::LogicalTimeMetadataHook>(getGlobalServiceContext());
+
_conn->setRequestMetadataWriter([this](OperationContext* opCtx, BSONObjBuilder* metadataBob) {
*metadataBob << rpc::kReplSetMetadataFieldName << 1;
*metadataBob << rpc::kOplogQueryMetadataFieldName << 1;
metadataBob->appendElements(ReadPreferenceSetting::secondaryPreferredMetadata());
- return Status::OK();
+
+ // Run LogicalTimeMetadataHook on request metadata so this matches the behavior of the
+ // connections in the replication coordinator thread pool.
+ return _logicalTimeMetadataHook->writeRequestMetadata(opCtx, metadataBob);
});
_conn->setReplyMetadataReader(
[this](OperationContext* opCtx, const BSONObj& metadataObj, StringData source) {
_metadataObj = metadataObj.getOwned();
- return Status::OK();
+
+ // Run LogicalTimeMetadataHook on reply metadata so this matches the behavior of the
+ // connections in the replication coordinator thread pool.
+ return _logicalTimeMetadataHook->readReplyMetadata(opCtx, source, _metadataObj);
});
}
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index c24b4309227..ad92d104aa3 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -37,6 +37,7 @@
#include "mongo/client/dbclient_connection.h"
#include "mongo/client/dbclient_cursor.h"
#include "mongo/client/fetcher.h"
+#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/abstract_async_component.h"
#include "mongo/db/repl/data_replicator_external_state.h"
@@ -371,6 +372,9 @@ private:
// Used to keep track of the last oplog entry read and processed from the sync source.
OpTime _lastFetched;
+ // Logical time metadata handling hook for the DBClientConnection.
+ std::unique_ptr<rpc::LogicalTimeMetadataHook> _logicalTimeMetadataHook;
+
// Set by the ReplyMetadataReader upon receiving a new batch.
BSONObj _metadataObj;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a85ba3a627f..bf5c9671260 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -4259,6 +4259,12 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint(
WithLock lk, const OpTimeAndWallTime& committedOpTimeAndWallTime, bool fromSyncSource) {
if (_topCoord->advanceLastCommittedOpTimeAndWallTime(committedOpTimeAndWallTime,
fromSyncSource)) {
+ // The last committed opTime should never advance beyond the global timestamp (i.e. the
+ // latest cluster time). Not enforced if the logical clock is disabled, e.g. for arbiters.
+ dassert(!LogicalClock::get(getServiceContext())->isEnabled() ||
+ _externalState->getGlobalTimestamp(getServiceContext()) >=
+ committedOpTimeAndWallTime.opTime.getTimestamp());
+
if (_getMemberState_inlock().arbiter()) {
// Arbiters do not store replicated data, so we consider their data trivially
// consistent.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 27271617d88..2fdaa9df462 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -899,6 +899,7 @@ TEST_F(ReplCoordHBV1Test,
// Update lastApplied, so commit point can be advanced.
replCoordSetMyLastAppliedOpTime(opTime2, Date_t() + Seconds(100));
+ getExternalState()->setGlobalTimestamp(getGlobalServiceContext(), commitPoint.getTimestamp());
{
net->enterNetwork();
net->runUntil(net->now() + config.getHeartbeatInterval());
@@ -976,6 +977,7 @@ TEST_F(ReplCoordHBV1Test, LastCommittedOpTimeOnlyUpdatesFromHeartbeatIfNotInStar
// Set follower mode to SECONDARY so commit point can be advanced through heartbeats.
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getExternalState()->setGlobalTimestamp(getGlobalServiceContext(), commitPoint.getTimestamp());
{
net->enterNetwork();
net->runUntil(net->now() + config.getHeartbeatInterval());
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 35c002d78b4..b3266e9c264 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -5579,13 +5579,13 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeIsNewer
replCoordSetMyLastAppliedOpTime(time, wallTime);
// higher OpTime, should change
- getReplCoord()->advanceCommitPoint({time, wallTime}, false);
+ replCoordAdvanceCommitPoint({time, wallTime}, false);
ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(wallTime, getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime);
ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime());
// lower OpTime, should not change
- getReplCoord()->advanceCommitPoint({oldTime, Date_t() + Seconds(5)}, false);
+ replCoordAdvanceCommitPoint({oldTime, Date_t() + Seconds(5)}, false);
ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(wallTime, getReplCoord()->getLastCommittedOpTimeAndWallTime().wallTime);
ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime());
@@ -5724,7 +5724,7 @@ TEST_F(ReplCoordTest, AdvanceCommitPointFromSyncSourceCanSetCommitPointToLastApp
replCoordSetMyLastAppliedOpTime(lastApplied.opTime, lastApplied.wallTime);
const bool fromSyncSource = true;
- getReplCoord()->advanceCommitPoint(commitPoint, fromSyncSource);
+ replCoordAdvanceCommitPoint(commitPoint, fromSyncSource);
// The commit point can be set to lastApplied, even though lastApplied is in a lower term.
ASSERT_EQUALS(lastApplied.opTime, getReplCoord()->getLastCommittedOpTime());
@@ -5754,7 +5754,7 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) {
replCoordSetMyLastAppliedOpTime(optime2, wallTime2);
// pass dummy Date_t to avoid advanceCommitPoint invariant
- getReplCoord()->advanceCommitPoint({optime1, wallTime1}, false);
+ replCoordAdvanceCommitPoint({optime1, wallTime1}, false);
auto opCtx = makeOperationContext();
@@ -6533,7 +6533,7 @@ TEST_F(ReplCoordTest, UpdatePositionCmdHasMetadata) {
optime.getTerm(), {optime, Date_t() + Seconds(optime.getSecs())}, optime, 1, OID(), -1, 1);
getReplCoord()->processReplSetMetadata(syncSourceMetadata);
// Pass dummy Date_t to avoid advanceCommitPoint invariant.
- getReplCoord()->advanceCommitPoint({optime, Date_t() + Seconds(optime.getSecs())}, true);
+ replCoordAdvanceCommitPoint({optime, Date_t() + Seconds(optime.getSecs())}, true);
BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand());
auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd));
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 4d2c4c82dd4..5aa9fd122d5 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -506,5 +506,22 @@ void ReplCoordTest::simulateCatchUpAbort() {
net->exitNetwork();
}
+void ReplCoordTest::replCoordAdvanceCommitPoint(const OpTime& opTime,
+ Date_t wallTime,
+ bool fromSyncSource) {
+ if (wallTime == Date_t()) {
+ wallTime = Date_t() + Seconds(opTime.getSecs());
+ }
+ getExternalState()->setGlobalTimestamp(getGlobalServiceContext(), opTime.getTimestamp());
+ getReplCoord()->advanceCommitPoint({opTime, wallTime}, fromSyncSource);
+}
+
+void ReplCoordTest::replCoordAdvanceCommitPoint(const OpTimeAndWallTime& opTimeAndWallTime,
+ bool fromSyncSource) {
+ getExternalState()->setGlobalTimestamp(getGlobalServiceContext(),
+ opTimeAndWallTime.opTime.getTimestamp());
+ getReplCoord()->advanceCommitPoint(opTimeAndWallTime, fromSyncSource);
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index 7dbdb4e957b..bdda299afdc 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -137,16 +137,10 @@ protected:
void replCoordAdvanceCommitPoint(const OpTime& opTime,
Date_t wallTime = Date_t(),
- bool fromSyncSource = false) {
- if (wallTime == Date_t()) {
- wallTime = Date_t() + Seconds(opTime.getSecs());
- }
- getReplCoord()->advanceCommitPoint({opTime, wallTime}, fromSyncSource);
- }
+ bool fromSyncSource = false);
- void replCoordAdvanceCommitPoint(const OpTimeAndWallTime& opTime, bool fromSyncSource = false) {
- getReplCoord()->advanceCommitPoint(opTime, fromSyncSource);
- }
+ void replCoordAdvanceCommitPoint(const OpTimeAndWallTime& opTimeAndWallTime,
+ bool fromSyncSource = false);
/**
* Gets the storage interface.