summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMisha Tyulenev <misha@mongodb.com>2016-08-09 16:01:02 -0400
committerMisha Tyulenev <misha@mongodb.com>2016-08-09 18:48:57 -0400
commit4c6009e67d3e503f796b5afcbcbeaa95eba80b44 (patch)
treeadb17522b67867f32da6a0dd9aa666a90ae6783e /src/mongo
parentf82f0122a5db98ac1ac8a893e42b5c267aa24a02 (diff)
downloadmongo-4c6009e67d3e503f796b5afcbcbeaa95eba80b44.tar.gz
SERVER-22620 prefer config servers with recent opTime
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/client/read_preference.h4
-rw-r--r--src/mongo/client/replica_set_monitor.cpp42
-rw-r--r--src/mongo/client/replica_set_monitor_internal.h9
-rw-r--r--src/mongo/client/replica_set_monitor_test.cpp178
-rw-r--r--src/mongo/s/client/shard_remote.cpp20
5 files changed, 232 insertions, 21 deletions
diff --git a/src/mongo/client/read_preference.h b/src/mongo/client/read_preference.h
index 4b2833a4f13..c521913d94d 100644
--- a/src/mongo/client/read_preference.h
+++ b/src/mongo/client/read_preference.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/util/duration.h"
namespace mongo {
@@ -127,7 +128,7 @@ struct ReadPreferenceSetting {
inline bool equals(const ReadPreferenceSetting& other) const {
return (pref == other.pref) && (tags == other.tags) &&
- (maxStalenessMS == other.maxStalenessMS);
+ (maxStalenessMS == other.maxStalenessMS) && (minOpTime == other.minOpTime);
}
/**
@@ -154,6 +155,7 @@ struct ReadPreferenceSetting {
ReadPreference pref;
TagSet tags;
Milliseconds maxStalenessMS{};
+ repl::OpTime minOpTime{};
/**
* The minimal value maxStalenessMS can have. It MUST be ReplicaSetMonitor::kRefreshPeriod * 2
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index c387f1674f7..c6e891144fd 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -39,6 +39,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/replica_set_monitor_internal.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/server_options.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/condition_variable.h"
@@ -99,6 +100,10 @@ bool isMaster(const Node& node) {
return node.isMaster;
}
+bool opTimeGreater(const Node* lhs, const Node* rhs) {
+ return lhs->opTime > rhs->opTime;
+}
+
bool compareLatencies(const Node* lhs, const Node* rhs) {
// NOTE: this automatically compares Node::unknownLatency worse than all others.
return lhs->latencyMicros < rhs->latencyMicros;
@@ -847,6 +852,8 @@ void IsMasterReply::parse(const BSONObj& obj) {
if (auto lastWrite = lastWriteField["lastWriteDate"]) {
lastWriteDate = lastWrite.date();
}
+
+ uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime));
}
} catch (const std::exception& e) {
ok = false;
@@ -915,8 +922,11 @@ void Node::update(const IsMasterReply& reply) {
}
}
- LOG(3) << "Updating " << host << " lastWriteDate to " << lastWriteDate;
+ LOG(3) << "Updating " << host << " lastWriteDate to " << reply.lastWriteDate;
lastWriteDate = reply.lastWriteDate;
+
+ LOG(3) << "Updating " << host << " opTime to " << reply.opTime;
+ opTime = reply.opTime;
lastWriteDateUpdateTime = Date_t::now();
}
@@ -1032,13 +1042,35 @@ HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) con
}
// don't do more complicated selection if not needed
- if (matchingNodes.empty())
+ if (matchingNodes.empty()) {
continue;
- if (matchingNodes.size() == 1)
+ }
+ if (matchingNodes.size() == 1) {
return matchingNodes.front()->host;
+ }
+
+ // Only consider nodes that satisfy the minOpTime
+ if (!criteria.minOpTime.isNull()) {
+ std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater);
+ for (size_t i = 0; i < matchingNodes.size(); i++) {
+ if (matchingNodes[i]->opTime < criteria.minOpTime) {
+ if (i == 0) {
+ // If no nodes satisfy the minOpTime criteria, we ignore the
+ // minOpTime requirement.
+ break;
+ }
+ matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
+ break;
+ }
+ }
+
+ if (matchingNodes.size() == 1) {
+ return matchingNodes.front()->host;
+ }
+ }
- // order by latency and don't consider hosts further than a threshold from the
- // closest.
+ // If there are multiple nodes satisfying the minOpTime, next order by latency
+ // and don't consider hosts further than a threshold from the closest.
std::sort(matchingNodes.begin(), matchingNodes.end(), compareLatencies);
for (size_t i = 1; i < matchingNodes.size(); i++) {
int64_t distance =
diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h
index eff1670d558..300452398b7 100644
--- a/src/mongo/client/replica_set_monitor_internal.h
+++ b/src/mongo/client/replica_set_monitor_internal.h
@@ -80,6 +80,7 @@ struct ReplicaSetMonitor::IsMasterReply {
HostAndPort host;
int64_t latencyMicros; // ignored if negative
Date_t lastWriteDate{};
+ repl::OpTime opTime{};
};
struct ReplicaSetMonitor::SetState {
@@ -109,7 +110,12 @@ public:
* not match: { "dc": "nyc", "rack": 2 }
* not match: { "dc": "sf" }
*/
- bool matches(const BSONObj& tag) const;
+ bool matches(const BSONObj&) const;
+
+ /**
+ * Returns true if all of the tags in the tag set match node's tags
+ */
+ bool matches(const TagSet&) const;
/**
* Updates this Node based on information in reply. The reply must be from this host.
@@ -126,6 +132,7 @@ public:
Date_t lastWriteDate{}; // from isMasterReply
Date_t lastWriteDateUpdateTime{}; // set to the local system's time at the time of updating
// lastWriteDate
+ repl::OpTime opTime{}; // from isMasterReply
};
typedef std::vector<Node> Nodes;
diff --git a/src/mongo/client/replica_set_monitor_test.cpp b/src/mongo/client/replica_set_monitor_test.cpp
index 28b6980c523..45cd93cc7e2 100644
--- a/src/mongo/client/replica_set_monitor_test.cpp
+++ b/src/mongo/client/replica_set_monitor_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/replica_set_monitor_internal.h"
#include "mongo/unittest/unittest.h"
+
namespace {
using namespace mongo;
@@ -1314,6 +1315,7 @@ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasOlderConfigVersion) {
TEST(ReplicaSetMonitor, MaxStalenessMSMatch) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
Refresher refresher(state);
+ repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(
ReadPreference::SecondaryOnly, TagSet(), Milliseconds(100000));
@@ -1341,7 +1343,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSMatch) {
<< hosts
<< "lastWrite"
<< BSON("lastWriteDate" << (nonStale ? lastWriteDateNonStale
- : lastWriteDateStale))
+ : lastWriteDateStale)
+ << "opTime"
+ << opTime.toBSON())
<< "ok"
<< true));
ns = refresher.getNextStep();
@@ -1364,6 +1368,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSMatch) {
TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
Refresher refresher(state);
+ repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
BSONArray hosts = BSON_ARRAY("a"
@@ -1388,7 +1393,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) {
<< hosts
<< "lastWrite"
<< BSON("lastWriteDate" << (primary ? lastWriteDateNonStale
- : lastWriteDateStale))
+ : lastWriteDateStale)
+ << "opTime"
+ << opTime.toBSON())
<< "ok"
<< true));
@@ -1412,6 +1419,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) {
TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
Refresher refresher(state);
+ repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
BSONArray hosts = BSON_ARRAY("a"
@@ -1437,7 +1445,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) {
<< "lastWrite"
<< BSON("lastWriteDate"
<< (isNonStale ? lastWriteDateNonStale
- : lastWriteDateStale))
+ : lastWriteDateStale)
+ << "opTime"
+ << opTime.toBSON())
<< "ok"
<< true));
@@ -1463,6 +1473,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) {
TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
Refresher refresher(state);
+ repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
BSONArray hosts = BSON_ARRAY("a"
@@ -1488,7 +1499,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) {
<< "lastWrite"
<< BSON("lastWriteDate"
<< (isNonStale ? lastWriteDateNonStale
- : lastWriteDateStale))
+ : lastWriteDateStale)
+ << "opTime"
+ << opTime.toBSON())
<< "ok"
<< true));
@@ -1513,6 +1526,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) {
TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
Refresher refresher(state);
+ repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
BSONArray hosts = BSON_ARRAY("a"
@@ -1537,7 +1551,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) {
<< hosts
<< "lastWrite"
<< BSON("lastWriteDate" << (primary ? lastWriteDateNonStale
- : lastWriteDateStale))
+ : lastWriteDateStale)
+ << "opTime"
+ << opTime.toBSON())
<< "ok"
<< true));
ns = refresher.getNextStep();
@@ -1562,6 +1578,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) {
TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
Refresher refresher(state);
+ repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
BSONArray hosts = BSON_ARRAY("a"
@@ -1586,7 +1603,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) {
<< hosts
<< "lastWrite"
<< BSON("lastWriteDate" << (primary ? lastWriteDateNonStale
- : lastWriteDateStale))
+ : lastWriteDateStale)
+ << "opTime"
+ << opTime.toBSON())
<< "ok"
<< true));
ns = refresher.getNextStep();
@@ -1607,9 +1626,10 @@ TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) {
/**
* Success matching maxStalenessMS parameter when one secondary failed
*/
-TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleeSecondaryMatched) {
+TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleSecondaryMatched) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
Refresher refresher(state);
+ repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
BSONArray hosts = BSON_ARRAY("a"
@@ -1636,7 +1656,9 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleeSecondaryMatched) {
<< "lastWrite"
<< BSON("lastWriteDate"
<< (isNonStale ? lastWriteDateNonStale
- : lastWriteDateStale))
+ : lastWriteDateStale)
+ << "opTime"
+ << opTime.toBSON())
<< "ok"
<< true));
ns = refresher.getNextStep();
@@ -1735,5 +1757,145 @@ TEST(ReplicaSetMonitor, MaxStalenessMSZeroNoLastWrite) {
ASSERT(!state->getMatchingHost(secondary).empty());
}
+/**
+ * Success matching minOpTime
+ */
+TEST(ReplicaSetMonitor, MinOpTimeMatched) {
+ SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
+ Refresher refresher(state);
+
+ repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10};
+ repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11};
+ repl::OpTime opTimeStale{Timestamp{10, 10}, 9};
+
+ ReadPreferenceSetting readPref(ReadPreference::Nearest, TagSet());
+ readPref.minOpTime = minOpTimeSetting;
+ BSONArray hosts = BSON_ARRAY("a"
+ << "b"
+ << "c");
+
+ // mock all replies
+ NextStep ns = refresher.getNextStep();
+ while (ns.step == NextStep::CONTACT_HOST) {
+ bool primary = ns.host.host() == "a";
+ bool isNonStale = ns.host.host() == "b";
+ BSONObj bson = BSON("setName"
+ << "name"
+ << "ismaster"
+ << primary
+ << "secondary"
+ << !primary
+ << "hosts"
+ << hosts
+ << "lastWrite"
+ << BSON("opTime" << (isNonStale ? opTimeNonStale.toBSON()
+ : opTimeStale.toBSON()))
+ << "ok"
+ << true);
+ refresher.receivedIsMaster(ns.host, -1, bson);
+ ns = refresher.getNextStep();
+ }
+
+ // Ensure that we have heard from all hosts and scan is done
+ ASSERT_EQUALS(ns.step, NextStep::DONE);
+ HostAndPort notStale = state->getMatchingHost(readPref);
+ ASSERT_EQUALS(notStale.host(), "b");
+}
+
+/**
+ * Failure matching minOpTime on primary for SecondaryOnly
+ */
+TEST(ReplicaSetMonitor, MinOpTimeNotMatched) {
+ SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
+ Refresher refresher(state);
+
+ repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10};
+ repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11};
+ repl::OpTime opTimeStale{Timestamp{10, 10}, 9};
+
+ ReadPreferenceSetting readPref(ReadPreference::SecondaryOnly, TagSet());
+ readPref.minOpTime = minOpTimeSetting;
+ BSONArray hosts = BSON_ARRAY("a"
+ << "b"
+ << "c");
+
+ // mock all replies
+ NextStep ns = refresher.getNextStep();
+ while (ns.step == NextStep::CONTACT_HOST) {
+ bool primary = ns.host.host() == "a";
+ bool isNonStale = ns.host.host() == "a";
+ BSONObj bson = BSON("setName"
+ << "name"
+ << "ismaster"
+ << primary
+ << "secondary"
+ << !primary
+ << "hosts"
+ << hosts
+ << "lastWrite"
+ << BSON("opTime" << (isNonStale ? opTimeNonStale.toBSON()
+ : opTimeStale.toBSON()))
+ << "ok"
+ << true);
+ refresher.receivedIsMaster(ns.host, -1, bson);
+ ns = refresher.getNextStep();
+ }
+
+ // Ensure that we have heard from all hosts and scan is done
+ ASSERT_EQUALS(ns.step, NextStep::DONE);
+ HostAndPort notStale = state->getMatchingHost(readPref);
+ ASSERT(notStale.host() != "a");
+}
+
+/**
+ * Ignore minOpTime if none is matched
+ */
+TEST(ReplicaSetMonitor, MinOpTimeIgnored) {
+ SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
+ Refresher refresher(state);
+
+ repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10};
+ repl::OpTime opTimeStale{Timestamp{10, 10}, 9};
+
+ Date_t lastWriteDateStale = Date_t::now() - Seconds(1000);
+ Date_t lastWriteDateNonStale = Date_t::now() - Seconds(100);
+
+ ReadPreferenceSetting readPref(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
+ readPref.minOpTime = minOpTimeSetting;
+ BSONArray hosts = BSON_ARRAY("a"
+ << "b"
+ << "c");
+
+ // mock all replies
+ NextStep ns = refresher.getNextStep();
+ while (ns.step == NextStep::CONTACT_HOST) {
+ bool primary = ns.host.host() == "a";
+ bool isNonStale = ns.host.host() == "c";
+ BSONObj bson = BSON("setName"
+ << "name"
+ << "ismaster"
+ << primary
+ << "secondary"
+ << !primary
+ << "hosts"
+ << hosts
+ << "lastWrite"
+ << BSON("lastWriteDate"
+ << (isNonStale || primary ? lastWriteDateNonStale
+ : lastWriteDateStale)
+ << "opTime"
+ << opTimeStale.toBSON())
+ << "ok"
+ << true);
+ refresher.receivedIsMaster(ns.host, -1, bson);
+ ns = refresher.getNextStep();
+ }
+
+ // Ensure that we have heard from all hosts and scan is done
+ ASSERT_EQUALS(ns.step, NextStep::DONE);
+ HostAndPort notStale = state->getMatchingHost(readPref);
+ ASSERT_EQUALS(notStale.host(), "c");
+}
+
} // namespace
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index ad9e697bb55..006647b748c 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -174,8 +174,12 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn,
Milliseconds maxTimeMSOverride,
const BSONObj& cmdObj) {
- const auto host =
- _targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
+ ReadPreferenceSetting readPrefWithMinOpTime(readPref);
+ if (getId() == "config") {
+ readPrefWithMinOpTime.minOpTime = grid.configOpTime();
+ }
+ const auto host = _targeter->findHost(readPrefWithMinOpTime,
+ RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
if (!host.isOK()) {
return Shard::HostWithResponse(boost::none, host.getStatus());
}
@@ -187,7 +191,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn,
host.getValue(),
dbName,
appendMaxTimeToCmdObj(maxTimeMSOverride, cmdObj),
- _getMetadataForCommand(readPref),
+ _getMetadataForCommand(readPrefWithMinOpTime),
txn,
requestTimeout < Milliseconds::max() ? requestTimeout : RemoteCommandRequest::kNoTimeout);
@@ -238,8 +242,12 @@ StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
const BSONObj& query,
const BSONObj& sort,
boost::optional<long long> limit) {
- const auto host =
- _targeter->findHost(readPref, RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
+ invariant(getId() == "config");
+ ReadPreferenceSetting readPrefWithMinOpTime(readPref);
+ readPrefWithMinOpTime.minOpTime = grid.configOpTime();
+
+ const auto host = _targeter->findHost(readPrefWithMinOpTime,
+ RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
if (!host.isOK()) {
return host.getStatus();
}
@@ -326,7 +334,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
nss.db().toString(),
findCmdBuilder.done(),
fetcherCallback,
- _getMetadataForCommand(readPref),
+ _getMetadataForCommand(readPrefWithMinOpTime),
maxTimeMS);
Status scheduleStatus = fetcher.schedule();
if (!scheduleStatus.isOK()) {