diff options
author | Misha Tyulenev <misha@mongodb.com> | 2016-08-09 16:01:02 -0400 |
---|---|---|
committer | Misha Tyulenev <misha@mongodb.com> | 2016-08-09 18:48:57 -0400 |
commit | 4c6009e67d3e503f796b5afcbcbeaa95eba80b44 (patch) | |
tree | adb17522b67867f32da6a0dd9aa666a90ae6783e /src/mongo | |
parent | f82f0122a5db98ac1ac8a893e42b5c267aa24a02 (diff) | |
download | mongo-4c6009e67d3e503f796b5afcbcbeaa95eba80b44.tar.gz |
SERVER-22620 prefer config servers with recent opTime
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/read_preference.h | 4 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 42 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_internal.h | 9 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_test.cpp | 178 | ||||
-rw-r--r-- | src/mongo/s/client/shard_remote.cpp | 20 |
5 files changed, 232 insertions, 21 deletions
diff --git a/src/mongo/client/read_preference.h b/src/mongo/client/read_preference.h index 4b2833a4f13..c521913d94d 100644 --- a/src/mongo/client/read_preference.h +++ b/src/mongo/client/read_preference.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/jsobj.h" +#include "mongo/db/repl/optime.h" #include "mongo/util/duration.h" namespace mongo { @@ -127,7 +128,7 @@ struct ReadPreferenceSetting { inline bool equals(const ReadPreferenceSetting& other) const { return (pref == other.pref) && (tags == other.tags) && - (maxStalenessMS == other.maxStalenessMS); + (maxStalenessMS == other.maxStalenessMS) && (minOpTime == other.minOpTime); } /** @@ -154,6 +155,7 @@ struct ReadPreferenceSetting { ReadPreference pref; TagSet tags; Milliseconds maxStalenessMS{}; + repl::OpTime minOpTime{}; /** * The minimal value maxStalenessMS can have. It MUST be ReplicaSetMonitor::kRefreshPeriod * 2 diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index c387f1674f7..c6e891144fd 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -39,6 +39,7 @@ #include "mongo/client/read_preference.h" #include "mongo/client/replica_set_monitor_internal.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/server_options.h" #include "mongo/s/grid.h" #include "mongo/stdx/condition_variable.h" @@ -99,6 +100,10 @@ bool isMaster(const Node& node) { return node.isMaster; } +bool opTimeGreater(const Node* lhs, const Node* rhs) { + return lhs->opTime > rhs->opTime; +} + bool compareLatencies(const Node* lhs, const Node* rhs) { // NOTE: this automatically compares Node::unknownLatency worse than all others. return lhs->latencyMicros < rhs->latencyMicros; @@ -847,6 +852,8 @@ void IsMasterReply::parse(const BSONObj& obj) { if (auto lastWrite = lastWriteField["lastWriteDate"]) { lastWriteDate = lastWrite.date(); } + + uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime)); } } catch (const std::exception& e) { ok = false; @@ -915,8 +922,11 @@ void Node::update(const IsMasterReply& reply) { } } - LOG(3) << "Updating " << host << " lastWriteDate to " << lastWriteDate; + LOG(3) << "Updating " << host << " lastWriteDate to " << reply.lastWriteDate; lastWriteDate = reply.lastWriteDate; + + LOG(3) << "Updating " << host << " opTime to " << reply.opTime; + opTime = reply.opTime; lastWriteDateUpdateTime = Date_t::now(); } @@ -1032,13 +1042,35 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con } // don't do more complicated selection if not needed - if (matchingNodes.empty()) + if (matchingNodes.empty()) { continue; - if (matchingNodes.size() == 1) + } + if (matchingNodes.size() == 1) { return matchingNodes.front()->host; + } + + // Only consider nodes that satisfy the minOpTime + if (!criteria.minOpTime.isNull()) { + std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater); + for (size_t i = 0; i < matchingNodes.size(); i++) { + if (matchingNodes[i]->opTime < criteria.minOpTime) { + if (i == 0) { + // If no nodes satisfy the minOpTime criteria, we ignore the + // minOpTime requirement. + break; + } + matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end()); + break; + } + } + + if (matchingNodes.size() == 1) { + return matchingNodes.front()->host; + } + } - // order by latency and don't consider hosts further than a threshold from the - // closest. + // If there are multiple nodes satisfying the minOpTime, next order by latency + // and don't consider hosts further than a threshold from the closest. std::sort(matchingNodes.begin(), matchingNodes.end(), compareLatencies); for (size_t i = 1; i < matchingNodes.size(); i++) { int64_t distance = diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h index eff1670d558..300452398b7 100644 --- a/src/mongo/client/replica_set_monitor_internal.h +++ b/src/mongo/client/replica_set_monitor_internal.h @@ -80,6 +80,7 @@ struct ReplicaSetMonitor::IsMasterReply { HostAndPort host; int64_t latencyMicros; // ignored if negative Date_t lastWriteDate{}; + repl::OpTime opTime{}; }; struct ReplicaSetMonitor::SetState { @@ -109,7 +110,12 @@ public: * not match: { "dc": "nyc", "rack": 2 } * not match: { "dc": "sf" } */ - bool matches(const BSONObj& tag) const; + bool matches(const BSONObj&) const; + + /** + * Returns true if all of the tags in the tag set match node's tags + */ + bool matches(const TagSet&) const; /** * Updates this Node based on information in reply. The reply must be from this host. @@ -126,6 +132,7 @@ public: Date_t lastWriteDate{}; // from isMasterReply Date_t lastWriteDateUpdateTime{}; // set to the local system's time at the time of updating // lastWriteDate + repl::OpTime opTime{}; // from isMasterReply }; typedef std::vector<Node> Nodes; diff --git a/src/mongo/client/replica_set_monitor_test.cpp b/src/mongo/client/replica_set_monitor_test.cpp index 28b6980c523..45cd93cc7e2 100644 --- a/src/mongo/client/replica_set_monitor_test.cpp +++ b/src/mongo/client/replica_set_monitor_test.cpp @@ -33,6 +33,7 @@ #include "mongo/client/replica_set_monitor_internal.h" #include "mongo/unittest/unittest.h" + namespace { using namespace mongo; @@ -1314,6 +1315,7 @@ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasOlderConfigVersion) { TEST(ReplicaSetMonitor, MaxStalenessMSMatch) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); Refresher refresher(state); + repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary( ReadPreference::SecondaryOnly, TagSet(), Milliseconds(100000)); @@ -1341,7 +1343,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSMatch) { << hosts << "lastWrite" << BSON("lastWriteDate" << (nonStale ? lastWriteDateNonStale - : lastWriteDateStale)) + : lastWriteDateStale) + << "opTime" + << opTime.toBSON()) << "ok" << true)); ns = refresher.getNextStep(); @@ -1364,6 +1368,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSMatch) { TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); Refresher refresher(state); + repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); BSONArray hosts = BSON_ARRAY("a" @@ -1388,7 +1393,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) { << hosts << "lastWrite" << BSON("lastWriteDate" << (primary ? lastWriteDateNonStale - : lastWriteDateStale)) + : lastWriteDateStale) + << "opTime" + << opTime.toBSON()) << "ok" << true)); @@ -1412,6 +1419,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) { TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); Refresher refresher(state); + repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); BSONArray hosts = BSON_ARRAY("a" @@ -1437,7 +1445,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) { << "lastWrite" << BSON("lastWriteDate" << (isNonStale ? lastWriteDateNonStale - : lastWriteDateStale)) + : lastWriteDateStale) + << "opTime" + << opTime.toBSON()) << "ok" << true)); @@ -1463,6 +1473,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) { TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); Refresher refresher(state); + repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); BSONArray hosts = BSON_ARRAY("a" @@ -1488,7 +1499,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) { << "lastWrite" << BSON("lastWriteDate" << (isNonStale ? lastWriteDateNonStale - : lastWriteDateStale)) + : lastWriteDateStale) + << "opTime" + << opTime.toBSON()) << "ok" << true)); @@ -1513,6 +1526,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) { TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); Refresher refresher(state); + repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); BSONArray hosts = BSON_ARRAY("a" @@ -1537,7 +1551,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) { << hosts << "lastWrite" << BSON("lastWriteDate" << (primary ? lastWriteDateNonStale - : lastWriteDateStale)) + : lastWriteDateStale) + << "opTime" + << opTime.toBSON()) << "ok" << true)); ns = refresher.getNextStep(); @@ -1562,6 +1578,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) { TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); Refresher refresher(state); + repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); BSONArray hosts = BSON_ARRAY("a" @@ -1586,7 +1603,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) { << hosts << "lastWrite" << BSON("lastWriteDate" << (primary ? lastWriteDateNonStale - : lastWriteDateStale)) + : lastWriteDateStale) + << "opTime" + << opTime.toBSON()) << "ok" << true)); ns = refresher.getNextStep(); @@ -1607,9 +1626,10 @@ TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) { /** * Success matching maxStalenessMS parameter when one secondary failed */ -TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleeSecondaryMatched) { +TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleSecondaryMatched) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); Refresher refresher(state); + repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); BSONArray hosts = BSON_ARRAY("a" @@ -1636,7 +1656,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleeSecondaryMatched) { << "lastWrite" << BSON("lastWriteDate" << (isNonStale ? lastWriteDateNonStale - : lastWriteDateStale)) + : lastWriteDateStale) + << "opTime" + << opTime.toBSON()) << "ok" << true)); ns = refresher.getNextStep(); @@ -1735,5 +1757,145 @@ TEST(ReplicaSetMonitor, MaxStalenessMSZeroNoLastWrite) { ASSERT(!state->getMatchingHost(secondary).empty()); } +/** + * Success matching minOpTime + */ +TEST(ReplicaSetMonitor, MinOpTimeMatched) { + SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); + Refresher refresher(state); + + repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10}; + repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11}; + repl::OpTime opTimeStale{Timestamp{10, 10}, 9}; + + ReadPreferenceSetting readPref(ReadPreference::Nearest, TagSet()); + readPref.minOpTime = minOpTimeSetting; + BSONArray hosts = BSON_ARRAY("a" + << "b" + << "c"); + + // mock all replies + NextStep ns = refresher.getNextStep(); + while (ns.step == NextStep::CONTACT_HOST) { + bool primary = ns.host.host() == "a"; + bool isNonStale = ns.host.host() == "b"; + BSONObj bson = BSON("setName" + << "name" + << "ismaster" + << primary + << "secondary" + << !primary + << "hosts" + << hosts + << "lastWrite" + << BSON("opTime" << (isNonStale ? opTimeNonStale.toBSON() + : opTimeStale.toBSON())) + << "ok" + << true); + refresher.receivedIsMaster(ns.host, -1, bson); + ns = refresher.getNextStep(); + } + + // Ensure that we have heard from all hosts and scan is done + ASSERT_EQUALS(ns.step, NextStep::DONE); + HostAndPort notStale = state->getMatchingHost(readPref); + ASSERT_EQUALS(notStale.host(), "b"); +} + +/** + * Failure matching minOpTime on primary for SecondaryOnly + */ +TEST(ReplicaSetMonitor, MinOpTimeNotMatched) { + SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); + Refresher refresher(state); + + repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10}; + repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11}; + repl::OpTime opTimeStale{Timestamp{10, 10}, 9}; + + ReadPreferenceSetting readPref(ReadPreference::SecondaryOnly, TagSet()); + readPref.minOpTime = minOpTimeSetting; + BSONArray hosts = BSON_ARRAY("a" + << "b" + << "c"); + + // mock all replies + NextStep ns = refresher.getNextStep(); + while (ns.step == NextStep::CONTACT_HOST) { + bool primary = ns.host.host() == "a"; + bool isNonStale = ns.host.host() == "a"; + BSONObj bson = BSON("setName" + << "name" + << "ismaster" + << primary + << "secondary" + << !primary + << "hosts" + << hosts + << "lastWrite" + << BSON("opTime" << (isNonStale ? opTimeNonStale.toBSON() + : opTimeStale.toBSON())) + << "ok" + << true); + refresher.receivedIsMaster(ns.host, -1, bson); + ns = refresher.getNextStep(); + } + + // Ensure that we have heard from all hosts and scan is done + ASSERT_EQUALS(ns.step, NextStep::DONE); + HostAndPort notStale = state->getMatchingHost(readPref); + ASSERT(notStale.host() != "a"); +} + +/** + * Ignore minOpTime if none is matched + */ +TEST(ReplicaSetMonitor, MinOpTimeIgnored) { + SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); + Refresher refresher(state); + + repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10}; + repl::OpTime opTimeStale{Timestamp{10, 10}, 9}; + + Date_t lastWriteDateStale = Date_t::now() - Seconds(1000); + Date_t lastWriteDateNonStale = Date_t::now() - Seconds(100); + + ReadPreferenceSetting readPref(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); + readPref.minOpTime = minOpTimeSetting; + BSONArray hosts = BSON_ARRAY("a" + << "b" + << "c"); + + // mock all replies + NextStep ns = refresher.getNextStep(); + while (ns.step == NextStep::CONTACT_HOST) { + bool primary = ns.host.host() == "a"; + bool isNonStale = ns.host.host() == "c"; + BSONObj bson = BSON("setName" + << "name" + << "ismaster" + << primary + << "secondary" + << !primary + << "hosts" + << hosts + << "lastWrite" + << BSON("lastWriteDate" + << (isNonStale || primary ? lastWriteDateNonStale + : lastWriteDateStale) + << "opTime" + << opTimeStale.toBSON()) + << "ok" + << true); + refresher.receivedIsMaster(ns.host, -1, bson); + ns = refresher.getNextStep(); + } + + // Ensure that we have heard from all hosts and scan is done + ASSERT_EQUALS(ns.step, NextStep::DONE); + HostAndPort notStale = state->getMatchingHost(readPref); + ASSERT_EQUALS(notStale.host(), "c"); +} + } // namespace diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index ad9e697bb55..006647b748c 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -174,8 +174,12 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn, Milliseconds maxTimeMSOverride, const BSONObj& cmdObj) { - const auto host = - _targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); + ReadPreferenceSetting readPrefWithMinOpTime(readPref); + if (getId() == "config") { + readPrefWithMinOpTime.minOpTime = grid.configOpTime(); + } + const auto host = _targeter->findHost(readPrefWithMinOpTime, + RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); if (!host.isOK()) { return Shard::HostWithResponse(boost::none, host.getStatus()); } @@ -187,7 +191,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn, host.getValue(), dbName, appendMaxTimeToCmdObj(maxTimeMSOverride, cmdObj), - _getMetadataForCommand(readPref), + _getMetadataForCommand(readPrefWithMinOpTime), txn, requestTimeout < Milliseconds::max() ? requestTimeout : RemoteCommandRequest::kNoTimeout); @@ -238,8 +242,12 @@ StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig( const BSONObj& query, const BSONObj& sort, boost::optional<long long> limit) { - const auto host = - _targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); + invariant(getId() == "config"); + ReadPreferenceSetting readPrefWithMinOpTime(readPref); + readPrefWithMinOpTime.minOpTime = grid.configOpTime(); + + const auto host = _targeter->findHost(readPrefWithMinOpTime, + RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); if (!host.isOK()) { return host.getStatus(); } @@ -326,7 +334,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig( nss.db().toString(), findCmdBuilder.done(), fetcherCallback, - _getMetadataForCommand(readPref), + _getMetadataForCommand(readPrefWithMinOpTime), maxTimeMS); Status scheduleStatus = fetcher.schedule(); if (!scheduleStatus.isOK()) { |