summaryrefslogtreecommitdiff
path: root/src/mongo/db/mirror_maestro.cpp
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-02-27 00:32:09 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-02 19:10:35 +0000
commit8c0293225af97f60131db8754f89be24cfd42174 (patch)
tree3d482dd61f55a35a807e5f37f8533d523ce537b5 /src/mongo/db/mirror_maestro.cpp
parente29b8e770f26bfd3f9f4529ed139fcd19cfd2b2d (diff)
downloadmongo-8c0293225af97f60131db8754f89be24cfd42174.tar.gz
SERVER-45818 Rework read mirroring infrastructure
This commit: - Fixes a busy polling bug in the TopologyVersionObserver - Alters the MirroringSampler to sample in bursts - Adjusts the MirrorMaestro to make use of the changed MirroringSampler - Disables update command mirroring temporarily
Diffstat (limited to 'src/mongo/db/mirror_maestro.cpp')
-rw-r--r--src/mongo/db/mirror_maestro.cpp86
1 files changed, 56 insertions, 30 deletions
diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp
index ce05f13fac3..b82b16e4b8b 100644
--- a/src/mongo/db/mirror_maestro.cpp
+++ b/src/mongo/db/mirror_maestro.cpp
@@ -96,7 +96,9 @@ private:
*
* This command is expected to only run on the _executor
*/
- void _mirror(std::shared_ptr<CommandInvocation> invocation, MirroredReadsParameters params);
+ void _mirror(std::vector<HostAndPort> hosts,
+ std::shared_ptr<CommandInvocation> invocation,
+ MirroredReadsParameters params);
/**
* An enum detailing the liveness of the Maestro
@@ -125,6 +127,7 @@ private:
// call member functions upon.
AtomicWord<bool> _isInitialized;
MirroredReadsServerParameter* _params = nullptr;
+ MirroringSampler _sampler;
std::shared_ptr<executor::TaskExecutor> _executor;
repl::TopologyVersionObserver _topologyVersionObserver;
};
@@ -234,25 +237,12 @@ void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation)
gMirroredReadsSection.seen.fetchAndAdd(1);
auto params = _params->_data.get();
- if (params.getSamplingRate() <= 0.0) {
- // We'll never need to sample
+ auto samplingParams = MirroringSampler::SamplingParameters(params.getSamplingRate());
+ if (!_sampler.shouldSample(samplingParams)) {
+ // If we wouldn't select a host, then nothing more to do
return;
}
- // There is the potential to actually mirror requests, so schedule the _mirror() invocation
- // out-of-line. This means the command itself can return quickly and we do the arduous work of
- // building new bsons and evaluating randomness in a less important context.
- ExecutorFuture(_executor) //
- .getAsync([this, invocation = std::move(invocation), params = std::move(params)](
- const auto& status) mutable {
- invariant(status.isOK());
-
- _mirror(std::move(invocation), std::move(params));
- });
-}
-
-void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation,
- MirroredReadsParameters params) {
auto imr = _topologyVersionObserver.getCached();
if (!imr) {
// If we don't have an IsMasterResponse, we can't know where to send our mirrored
@@ -260,11 +250,32 @@ void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation,
return;
}
- auto hosts = MirroringSampler::getMirroringTargets(imr, params.getSamplingRate());
+ auto hosts = _sampler.getRawMirroringTargets(imr);
if (hosts.empty()) {
+ // If we had no eligible hosts, nothing more to do
return;
}
+ // There is the potential to actually mirror requests, so schedule the _mirror() invocation
+ // out-of-line. This means the command itself can return quickly and we do the arduous work of
+ // building new bsons and evaluating randomness in a less important context.
+ ExecutorFuture(_executor) //
+ .getAsync([this,
+ hosts = std::move(hosts),
+ invocation = std::move(invocation),
+ params = std::move(params)](const auto& status) mutable {
+ if (ErrorCodes::isShutdownError(status)) {
+ return;
+ }
+ invariant(status.isOK());
+
+ _mirror(std::move(hosts), std::move(invocation), std::move(params));
+ });
+}
+
+void MirrorMaestroImpl::_mirror(std::vector<HostAndPort> hosts,
+ std::shared_ptr<CommandInvocation> invocation,
+ MirroredReadsParameters params) {
auto payload = [&] {
BSONObjBuilder bob;
@@ -274,39 +285,54 @@ void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation,
bob.append("maxTimeMS", params.getMaxTimeMS());
{
- // Set secondary read preference
+ // Set secondaryPreferred read preference
BSONObjBuilder rpBob = bob.subobjStart("$readPreference");
- rpBob.append("mode", "secondary");
+ rpBob.append("mode", "secondaryPreferred");
+ }
+
+ {
+ // Set local read concern
+ BSONObjBuilder rcBob = bob.subobjStart("readConcern");
+ rcBob.append("level", "local");
}
return bob.obj();
}();
+ // TODO SERVER-46514 When we figure out how to deallocate CommandInvocations effectively, we
+ // should do a normalized sample here.
for (auto& host : hosts) {
- static const auto mirrorResponseCallback = [host](auto& args) {
+ auto mirrorResponseCallback = [host](auto& args) {
if (MONGO_likely(!mirrorMaestroExpectsResponse.shouldFail())) {
// If we don't expect responses, then there is nothing to do here
return;
}
- if (args.response.isOK()) {
- gMirroredReadsSection.resolved.fetchAndAdd(1);
- LOGV2_DEBUG(31457,
- 4,
- "Received response from {host}, response: {response}",
- "host"_attr = host,
- "response"_attr = args.response);
- }
+ invariant(args.response.isOK());
+
+ gMirroredReadsSection.resolved.fetchAndAdd(1);
+ LOGV2_DEBUG(31457,
+ 4,
+ "Received response from {host}, response: {response}",
+ "host"_attr = host,
+ "response"_attr = args.response);
};
auto newRequest = executor::RemoteCommandRequest(
host, invocation->ns().db().toString(), payload, nullptr);
+ if (MONGO_likely(!mirrorMaestroExpectsResponse.shouldFail())) {
+ // If we're not expecting a response, set to fire and forget
+ newRequest.fireAndForgetMode = executor::RemoteCommandRequest::FireAndForgetMode::kOn;
+ }
+
LOGV2_DEBUG(31455,
4,
"Mirroring to {host}, request: {request}",
"host"_attr = host,
"request"_attr = newRequest);
+
auto status =
- _executor->scheduleRemoteCommand(newRequest, mirrorResponseCallback).getStatus();
+ _executor->scheduleRemoteCommand(newRequest, std::move(mirrorResponseCallback))
+ .getStatus();
if (!status.isOK()) {
LOGV2_DEBUG(
31456, 2, "Failed to mirror read command due to {error}", "error"_attr = status);