diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-02-27 00:32:09 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-02 19:10:35 +0000 |
commit | 8c0293225af97f60131db8754f89be24cfd42174 (patch) | |
tree | 3d482dd61f55a35a807e5f37f8533d523ce537b5 /src/mongo/db/mirror_maestro.cpp | |
parent | e29b8e770f26bfd3f9f4529ed139fcd19cfd2b2d (diff) | |
download | mongo-8c0293225af97f60131db8754f89be24cfd42174.tar.gz |
SERVER-45818 Rework read mirroring infrastructure
This commit:
- Fixes a busy polling bug in the TopologyVersionObserver
- Alters the MirroringSampler to sample in bursts
- Adjusts the MirrorMaestro to make use of the changed MirroringSampler
- Disables update command mirroring temporarily
Diffstat (limited to 'src/mongo/db/mirror_maestro.cpp')
-rw-r--r-- | src/mongo/db/mirror_maestro.cpp | 86 |
1 files changed, 56 insertions, 30 deletions
diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp index ce05f13fac3..b82b16e4b8b 100644 --- a/src/mongo/db/mirror_maestro.cpp +++ b/src/mongo/db/mirror_maestro.cpp @@ -96,7 +96,9 @@ private: * * This command is expected to only run on the _executor */ - void _mirror(std::shared_ptr<CommandInvocation> invocation, MirroredReadsParameters params); + void _mirror(std::vector<HostAndPort> hosts, + std::shared_ptr<CommandInvocation> invocation, + MirroredReadsParameters params); /** * An enum detailing the liveness of the Maestro @@ -125,6 +127,7 @@ private: // call member functions upon. AtomicWord<bool> _isInitialized; MirroredReadsServerParameter* _params = nullptr; + MirroringSampler _sampler; std::shared_ptr<executor::TaskExecutor> _executor; repl::TopologyVersionObserver _topologyVersionObserver; }; @@ -234,25 +237,12 @@ void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation) gMirroredReadsSection.seen.fetchAndAdd(1); auto params = _params->_data.get(); - if (params.getSamplingRate() <= 0.0) { - // We'll never need to sample + auto samplingParams = MirroringSampler::SamplingParameters(params.getSamplingRate()); + if (!_sampler.shouldSample(samplingParams)) { + // If we wouldn't select a host, then nothing more to do return; } - // There is the potential to actually mirror requests, so schedule the _mirror() invocation - // out-of-line. This means the command itself can return quickly and we do the arduous work of - // building new bsons and evaluating randomness in a less important context. - ExecutorFuture(_executor) // - .getAsync([this, invocation = std::move(invocation), params = std::move(params)]( - const auto& status) mutable { - invariant(status.isOK()); - - _mirror(std::move(invocation), std::move(params)); - }); -} - -void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation, - MirroredReadsParameters params) { auto imr = _topologyVersionObserver.getCached(); if (!imr) { // If we don't have an IsMasterResponse, we can't know where to send our mirrored @@ -260,11 +250,32 @@ void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation, return; } - auto hosts = MirroringSampler::getMirroringTargets(imr, params.getSamplingRate()); + auto hosts = _sampler.getRawMirroringTargets(imr); if (hosts.empty()) { + // If we had no eligible hosts, nothing more to do return; } + // There is the potential to actually mirror requests, so schedule the _mirror() invocation + // out-of-line. This means the command itself can return quickly and we do the arduous work of + // building new bsons and evaluating randomness in a less important context. + ExecutorFuture(_executor) // + .getAsync([this, + hosts = std::move(hosts), + invocation = std::move(invocation), + params = std::move(params)](const auto& status) mutable { + if (ErrorCodes::isShutdownError(status)) { + return; + } + invariant(status.isOK()); + + _mirror(std::move(hosts), std::move(invocation), std::move(params)); + }); +} + +void MirrorMaestroImpl::_mirror(std::vector<HostAndPort> hosts, + std::shared_ptr<CommandInvocation> invocation, + MirroredReadsParameters params) { auto payload = [&] { BSONObjBuilder bob; @@ -274,39 +285,54 @@ void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation, bob.append("maxTimeMS", params.getMaxTimeMS()); { - // Set secondary read preference + // Set secondaryPreferred read preference BSONObjBuilder rpBob = bob.subobjStart("$readPreference"); - rpBob.append("mode", "secondary"); + rpBob.append("mode", "secondaryPreferred"); + } + + { + // Set local read concern + BSONObjBuilder rcBob = bob.subobjStart("readConcern"); + rcBob.append("level", "local"); } return bob.obj(); }(); + // TODO SERVER-46514 When we figure out how to deallocate CommandInvocations effectively, we + // should do a normalized sample here. for (auto& host : hosts) { - static const auto mirrorResponseCallback = [host](auto& args) { + auto mirrorResponseCallback = [host](auto& args) { if (MONGO_likely(!mirrorMaestroExpectsResponse.shouldFail())) { // If we don't expect responses, then there is nothing to do here return; } - if (args.response.isOK()) { - gMirroredReadsSection.resolved.fetchAndAdd(1); - LOGV2_DEBUG(31457, - 4, - "Received response from {host}, response: {response}", - "host"_attr = host, - "response"_attr = args.response); - } + invariant(args.response.isOK()); + + gMirroredReadsSection.resolved.fetchAndAdd(1); + LOGV2_DEBUG(31457, + 4, + "Received response from {host}, response: {response}", + "host"_attr = host, + "response"_attr = args.response); }; auto newRequest = executor::RemoteCommandRequest( host, invocation->ns().db().toString(), payload, nullptr); + if (MONGO_likely(!mirrorMaestroExpectsResponse.shouldFail())) { + // If we're not expecting a response, set to fire and forget + newRequest.fireAndForgetMode = executor::RemoteCommandRequest::FireAndForgetMode::kOn; + } + LOGV2_DEBUG(31455, 4, "Mirroring to {host}, request: {request}", "host"_attr = host, "request"_attr = newRequest); + auto status = - _executor->scheduleRemoteCommand(newRequest, mirrorResponseCallback).getStatus(); + _executor->scheduleRemoteCommand(newRequest, std::move(mirrorResponseCallback)) + .getStatus(); if (!status.isOK()) { LOGV2_DEBUG( 31456, 2, "Failed to mirror read command due to {error}", "error"_attr = status); |