diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-12-06 11:52:29 -0500 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-12-06 17:33:01 -0500 |
commit | cb5381a8304a0d525328d17823948848485e5a05 (patch) | |
tree | 542ee7873cd9c475cc215c68a5e674c772deea38 | |
parent | 2b065f5f92847901f917183674ce364263300b8c (diff) | |
download | mongo-cb5381a8304a0d525328d17823948848485e5a05.tar.gz |
SERVER-32178 Do not use IDL on oldest oplog entry
(cherry picked from commit cef82b45ca182e788ba33aa6cb034b34a63b7b56)
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 52 |
2 files changed, 67 insertions, 8 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index ebb9f9649c7..f95e5ae5aeb 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -168,7 +168,11 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher( _taskExecutor, candidate, kLocalOplogNss.db().toString(), - BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" << BSON("$natural" << 1)), + BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" << BSON("$natural" << 1) + << "projection" + << BSON(OplogEntryBase::kTimestampFieldName << 1 + << OplogEntryBase::kTermFieldName + << 1)), stdx::bind(&SyncSourceResolver::_firstOplogEntryFetcherCallback, this, stdx::placeholders::_1, @@ -242,9 +246,18 @@ OpTime SyncSourceResolver::_parseRemoteEarliestOpTime(const HostAndPort& candida return OpTime(); } - const OplogEntry oplogEntry(firstObjFound); - const auto remoteEarliestOpTime = oplogEntry.getOpTime(); - if (remoteEarliestOpTime.isNull()) { + const auto remoteEarliestOpTime = OpTime::parseFromOplogEntry(firstObjFound); + if (!remoteEarliestOpTime.isOK()) { + const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration; + log() << "Blacklisting " << candidate << " due to error parsing OpTime from the oldest" + << " oplog entry for " << kFirstOplogEntryNullTimestampBlacklistDuration + << " until: " << until << ". Error: " << remoteEarliestOpTime.getStatus() + << ", Entry: " << redact(firstObjFound); + _syncSourceSelector->blacklistSyncSource(candidate, until); + return OpTime(); + } + + if (remoteEarliestOpTime.getValue().isNull()) { // First document in remote oplog is empty. const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration; log() << "Blacklisting " << candidate << " due to null timestamp in first document for " @@ -253,7 +266,7 @@ OpTime SyncSourceResolver::_parseRemoteEarliestOpTime(const HostAndPort& candida return OpTime(); } - return remoteEarliestOpTime; + return remoteEarliestOpTime.getValue(); } void SyncSourceResolver::_firstOplogEntryFetcherCallback( diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 9599fe93c3c..e6639fee1c9 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -535,7 +535,7 @@ TEST_F(SyncSourceResolverTest, TEST_F(SyncSourceResolverTest, SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyOplog) { HostAndPort candidate1("node1", 12345); - HostAndPort candidate2("node1", 12345); + HostAndPort candidate2("node2", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); ASSERT_TRUE(_resolver->isActive()); @@ -560,7 +560,7 @@ TEST_F(SyncSourceResolverTest, TEST_F(SyncSourceResolverTest, SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyFirstOplogEntry) { HostAndPort candidate1("node1", 12345); - HostAndPort candidate2("node1", 12345); + HostAndPort candidate2("node2", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); ASSERT_TRUE(_resolver->isActive()); @@ -583,9 +583,35 @@ TEST_F(SyncSourceResolverTest, } TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesWhenFirstNodeContainsBadOplogEntry) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->setChooseNewSyncSourceResult_forTest(candidate1); + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, {BSON("t" << 1)}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest()); + ASSERT_EQUALS(getExecutor().now() + + SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration, + _selector->getLastBlacklistExpiration_forTest()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + _scheduleRBIDResponse(getNet(), candidate2); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, SyncSourceResolverWillTryOtherSourcesWhenFirstNodeContainsOplogEntryWithNullTimestamp) { HostAndPort candidate1("node1", 12345); - HostAndPort candidate2("node1", 12345); + HostAndPort candidate2("node2", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); ASSERT_TRUE(_resolver->isActive()); @@ -608,6 +634,26 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); } + +TEST_F(SyncSourceResolverTest, SyncSourceResolverWillSucceedWithExtraFields) { + HostAndPort candidate1("node1", 12345); + _selector->setChooseNewSyncSourceResult_forTest(candidate1); + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse(getNet(), + _selector.get(), + candidate1, + HostAndPort(), + {BSON("ts" << Timestamp(1, 1) << "t" << 1 << "note" + << "a")}); + + _scheduleRBIDResponse(getNet(), candidate1); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); +} /** * Constructs and schedules a network interface response using the given documents to the required * optime on the sync source candidate. |