summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-12-06 11:52:29 -0500
committerJudah Schvimer <judah@mongodb.com>2017-12-06 17:33:01 -0500
commitcb5381a8304a0d525328d17823948848485e5a05 (patch)
tree542ee7873cd9c475cc215c68a5e674c772deea38
parent2b065f5f92847901f917183674ce364263300b8c (diff)
downloadmongo-cb5381a8304a0d525328d17823948848485e5a05.tar.gz
SERVER-32178 Do not use IDL on oldest oplog entry
(cherry picked from commit cef82b45ca182e788ba33aa6cb034b34a63b7b56)
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp23
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp52
2 files changed, 67 insertions, 8 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index ebb9f9649c7..f95e5ae5aeb 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -168,7 +168,11 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher(
_taskExecutor,
candidate,
kLocalOplogNss.db().toString(),
- BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" << BSON("$natural" << 1)),
+ BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" << BSON("$natural" << 1)
+ << "projection"
+ << BSON(OplogEntryBase::kTimestampFieldName << 1
+ << OplogEntryBase::kTermFieldName
+ << 1)),
stdx::bind(&SyncSourceResolver::_firstOplogEntryFetcherCallback,
this,
stdx::placeholders::_1,
@@ -242,9 +246,18 @@ OpTime SyncSourceResolver::_parseRemoteEarliestOpTime(const HostAndPort& candida
return OpTime();
}
- const OplogEntry oplogEntry(firstObjFound);
- const auto remoteEarliestOpTime = oplogEntry.getOpTime();
- if (remoteEarliestOpTime.isNull()) {
+ const auto remoteEarliestOpTime = OpTime::parseFromOplogEntry(firstObjFound);
+ if (!remoteEarliestOpTime.isOK()) {
+ const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration;
+ log() << "Blacklisting " << candidate << " due to error parsing OpTime from the oldest"
+ << " oplog entry for " << kFirstOplogEntryNullTimestampBlacklistDuration
+ << " until: " << until << ". Error: " << remoteEarliestOpTime.getStatus()
+ << ", Entry: " << redact(firstObjFound);
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+ return OpTime();
+ }
+
+ if (remoteEarliestOpTime.getValue().isNull()) {
// First document in remote oplog is empty.
const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration;
log() << "Blacklisting " << candidate << " due to null timestamp in first document for "
@@ -253,7 +266,7 @@ OpTime SyncSourceResolver::_parseRemoteEarliestOpTime(const HostAndPort& candida
return OpTime();
}
- return remoteEarliestOpTime;
+ return remoteEarliestOpTime.getValue();
}
void SyncSourceResolver::_firstOplogEntryFetcherCallback(
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index 9599fe93c3c..e6639fee1c9 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -535,7 +535,7 @@ TEST_F(SyncSourceResolverTest,
TEST_F(SyncSourceResolverTest,
SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyOplog) {
HostAndPort candidate1("node1", 12345);
- HostAndPort candidate2("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
ASSERT_TRUE(_resolver->isActive());
@@ -560,7 +560,7 @@ TEST_F(SyncSourceResolverTest,
TEST_F(SyncSourceResolverTest,
SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyFirstOplogEntry) {
HostAndPort candidate1("node1", 12345);
- HostAndPort candidate2("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
ASSERT_TRUE(_resolver->isActive());
@@ -583,9 +583,35 @@ TEST_F(SyncSourceResolverTest,
}
TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesWhenFirstNodeContainsBadOplogEntry) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->setChooseNewSyncSourceResult_forTest(candidate1);
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, {BSON("t" << 1)});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest());
+ ASSERT_EQUALS(getExecutor().now() +
+ SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration,
+ _selector->getLastBlacklistExpiration_forTest());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate2);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
SyncSourceResolverWillTryOtherSourcesWhenFirstNodeContainsOplogEntryWithNullTimestamp) {
HostAndPort candidate1("node1", 12345);
- HostAndPort candidate2("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
ASSERT_TRUE(_resolver->isActive());
@@ -608,6 +634,26 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
}
+
+TEST_F(SyncSourceResolverTest, SyncSourceResolverWillSucceedWithExtraFields) {
+ HostAndPort candidate1("node1", 12345);
+ _selector->setChooseNewSyncSourceResult_forTest(candidate1);
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(getNet(),
+ _selector.get(),
+ candidate1,
+ HostAndPort(),
+ {BSON("ts" << Timestamp(1, 1) << "t" << 1 << "note"
+ << "a")});
+
+ _scheduleRBIDResponse(getNet(), candidate1);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
+}
/**
* Constructs and schedules a network interface response using the given documents to the required
* optime on the sync source candidate.