diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-02-18 18:25:00 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-27 02:18:39 +0000 |
commit | 74961aa50ded11df8612bed9e0bd40f6c9466560 (patch) | |
tree | 933de51e64dae1d5b66bfbfd1e5ddf9e978beb07 | |
parent | 679d1e1028ddbdd1ac5778f3e0ab0cb3ffd5ee27 (diff) | |
download | mongo-74961aa50ded11df8612bed9e0bd40f6c9466560.tar.gz |
SERVER-46002 Attach core read mirroring functionality
-rw-r--r-- | jstests/noPassthrough/mirror_reads.js | 124 | ||||
-rw-r--r-- | src/mongo/bson/bsonobj.cpp | 20 | ||||
-rw-r--r-- | src/mongo/bson/bsonobj.h | 5 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 22 | ||||
-rw-r--r-- | src/mongo/db/commands.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/commands.h | 26 | ||||
-rw-r--r-- | src/mongo/db/commands/count_cmd.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/commands/distinct.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/mirror_maestro.cpp | 200 | ||||
-rw-r--r-- | src/mongo/db/mirror_maestro.h | 8 | ||||
-rw-r--r-- | src/mongo/db/mirror_maestro.idl | 7 | ||||
-rw-r--r-- | src/mongo/db/query/query_planner.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer.cpp | 163 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer.h | 36 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 3 |
20 files changed, 612 insertions, 156 deletions
diff --git a/jstests/noPassthrough/mirror_reads.js b/jstests/noPassthrough/mirror_reads.js new file mode 100644 index 00000000000..692a9ada958 --- /dev/null +++ b/jstests/noPassthrough/mirror_reads.js @@ -0,0 +1,124 @@ +/** + * Verify that mirroredReads happen in response to setParameters.mirroredReads + * + * @tags: [requires_replication] + */ + +(function() { +"use strict"; + +function setParameter({rst, value}) { + return rst.getPrimary().adminCommand({setParameter: 1, mirrorReads: value}); +} + +const kBurstCount = 1000; +const kDbName = "mirrored_reads_test"; +const kCollName = "test"; + +function sendAndCheckReads({rst, cmd, minRate, maxRate}) { + let startMirroredReads = + rst.getPrimary().getDB(kDbName).serverStatus({mirroredReads: 1}).mirroredReads; + + jsTestLog(`Sending ${kBurstCount} request burst of ${tojson(cmd)} to primary`); + + // Blast out a set of trivial reads + for (var i = 0; i < kBurstCount; ++i) { + rst.getPrimary().getDB(kDbName).runCommand(cmd); + } + + jsTestLog(`Verifying ${tojson(cmd)} was mirrored`); + + // Verify that the reads have been observed on the primary + { + let currentMirroredReads = + rst.getPrimary().getDB(kDbName).serverStatus({mirroredReads: 1}).mirroredReads; + assert.eq(startMirroredReads.seen + kBurstCount, currentMirroredReads.seen); + } + + // Verify that the reads mirrored to the secondaries have responded + assert.soon(() => { + let currentMirroredReads = + rst.getPrimary().getDB(kDbName).serverStatus({mirroredReads: 1}).mirroredReads; + + let readsSeen = currentMirroredReads.seen - startMirroredReads.seen; + let readsMirrored = currentMirroredReads.resolved - startMirroredReads.resolved; + + let numNodes = rst.getSecondaries().length; + jsTestLog(`Seen ${readsSeen} requests; ` + + `verified ${readsMirrored / 2} requests ` + + `x ${numNodes} nodes`); + + let rate = readsMirrored / readsSeen / numNodes; + return (rate >= minRate) && (rate <= maxRate); + }, "Did not verify all requests within time limit", 10000); + + jsTestLog(`Verified ${tojson(cmd)} was mirrored`); +} + +function verifyMirrorReads(rst, cmd) { + { + jsTestLog(`Verifying disabled read mirroring with ${tojson(cmd)}`); + let samplingRate = 0.0; + + assert.commandWorked(setParameter({rst: rst, value: {samplingRate: samplingRate}})); + sendAndCheckReads({rst: rst, cmd: cmd, minRate: samplingRate, maxRate: samplingRate}); + } + + { + jsTestLog(`Verifying full read mirroring with ${tojson(cmd)}`); + let samplingRate = 1.0; + + assert.commandWorked(setParameter({rst: rst, value: {samplingRate: samplingRate}})); + sendAndCheckReads({rst: rst, cmd: cmd, minRate: samplingRate, maxRate: samplingRate}); + } + + { + jsTestLog(`Verifying partial read mirroring with ${tojson(cmd)}`); + let samplingRate = 0.5; + let gaussDeviation = .34; + let max = samplingRate + gaussDeviation; + let min = samplingRate - gaussDeviation; + + assert.commandWorked(setParameter({rst: rst, value: {samplingRate: samplingRate}})); + sendAndCheckReads({rst: rst, cmd: cmd, minRate: min, maxRate: max}); + } +} + +{ + const rst = new ReplSetTest({ + nodes: 3, + nodeOptions: + {setParameter: {"failpoint.mirrorMaestroExpectsResponse": tojson({mode: "alwaysOn"})}} + }); + rst.startSet(); + rst.initiateWithHighElectionTimeout(); + + jsTestLog(`Attempting invalid mirrorReads parameters`); + assert.commandFailed(setParameter({rst: rst, value: 0.5})); + assert.commandFailed(setParameter({rst: rst, value: "half"})); + assert.commandFailed(setParameter({rst: rst, value: {samplingRate: -1.0}})); + assert.commandFailed(setParameter({rst: rst, value: {samplingRate: 1.01}})); + assert.commandFailed(setParameter({rst: rst, value: {somplingRate: 1.0}})); + + // Put in a datum + { + let ret = + rst.getPrimary().getDB(kDbName).runCommand({insert: kCollName, documents: [{x: 1}]}); + assert.commandWorked(ret); + } + + jsTestLog("Verifying mirrored reads for 'find' commands"); + verifyMirrorReads(rst, {find: kCollName, filter: {}}); + + jsTestLog("Verifying mirrored reads for 'count' commands"); + verifyMirrorReads(rst, {count: kCollName, query: {}}); + + jsTestLog("Verifying mirrored reads for 'distinct' commands"); + verifyMirrorReads(rst, {distinct: kCollName, key: "x"}); + + jsTestLog("Verifying mirrored reads for 'findAndModify' commands"); + verifyMirrorReads(rst, {findAndModify: kCollName, query: {}, update: {'$inc': {x: 1}}}); + + rst.stopSet(); +} +})(); diff --git a/src/mongo/bson/bsonobj.cpp b/src/mongo/bson/bsonobj.cpp index 74d3edf1588..284058feecf 100644 --- a/src/mongo/bson/bsonobj.cpp +++ b/src/mongo/bson/bsonobj.cpp @@ -346,8 +346,7 @@ bool BSONObj::isFieldNamePrefixOf(const BSONObj& otherObj) const { return !a.more(); } -BSONObj BSONObj::extractFieldsUnDotted(const BSONObj& pattern) const { - BSONObjBuilder b; +void BSONObj::extractFieldsUndotted(BSONObjBuilder* b, const BSONObj& pattern) const { BSONObjIterator i(pattern); while (i.moreWithEOO()) { BSONElement e = i.next(); @@ -355,13 +354,17 @@ BSONObj BSONObj::extractFieldsUnDotted(const BSONObj& pattern) const { break; BSONElement x = getField(e.fieldName()); if (!x.eoo()) - b.appendAs(x, ""); + b->appendAs(x, ""); } - return b.obj(); } -BSONObj BSONObj::filterFieldsUndotted(const BSONObj& filter, bool inFilter) const { +BSONObj BSONObj::extractFieldsUndotted(const BSONObj& pattern) const { BSONObjBuilder b; + extractFieldsUndotted(&b, pattern); + return b.obj(); +} + +void BSONObj::filterFieldsUndotted(BSONObjBuilder* b, const BSONObj& filter, bool inFilter) const { BSONObjIterator i(*this); while (i.moreWithEOO()) { BSONElement e = i.next(); @@ -369,8 +372,13 @@ BSONObj BSONObj::filterFieldsUndotted(const BSONObj& filter, bool inFilter) cons break; BSONElement x = filter.getField(e.fieldName()); if ((x.eoo() && !inFilter) || (!x.eoo() && inFilter)) - b.append(e); + b->append(e); } +} + +BSONObj BSONObj::filterFieldsUndotted(const BSONObj& filter, bool inFilter) const { + BSONObjBuilder b; + filterFieldsUndotted(&b, filter, inFilter); return b.obj(); } diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h index 215652bcd29..f22c9391bb6 100644 --- a/src/mongo/bson/bsonobj.h +++ b/src/mongo/bson/bsonobj.h @@ -52,6 +52,7 @@ namespace mongo { +class BSONObjBuilder; class BSONObjStlIterator; class ExtendedCanonicalV200Generator; class ExtendedRelaxedV200Generator; @@ -393,9 +394,11 @@ public: * this.extractFieldsUnDotted({b : "blah"}) -> {"" : 5} * */ - BSONObj extractFieldsUnDotted(const BSONObj& pattern) const; + BSONObj extractFieldsUndotted(const BSONObj& pattern) const; + void extractFieldsUndotted(BSONObjBuilder* b, const BSONObj& pattern) const; BSONObj filterFieldsUndotted(const BSONObj& filter, bool inFilter) const; + void filterFieldsUndotted(BSONObjBuilder* b, const BSONObj& filter, bool inFilter) const; BSONElement getFieldUsingIndexNames(StringData fieldName, const BSONObj& indexKey) const; diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 193a759b5a5..f959c2520ae 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -488,20 +488,25 @@ env.Library( source=[ 'mirror_maestro.cpp', env.Idlc('mirror_maestro.idl')[0], + 'mirroring_sampler.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/executor/task_executor_interface', + "$BUILD_DIR/mongo/rpc/protocol", 'service_context', + "$BUILD_DIR/mongo/util/net/network", ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', '$BUILD_DIR/mongo/idl/idl_parser', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', + 'commands/server_status', 'repl/replica_set_messages', 'repl/repl_coordinator_interface', - '$BUILD_DIR/mongo/util/concurrency/thread_pool', + 'repl/topology_version_observer', ], ) @@ -1824,19 +1829,6 @@ env.Library( ], ) -env.Library( - target='mirroring_sampler', - source=[ - 'mirroring_sampler.cpp', - ], - LIBDEPS=[ - "$BUILD_DIR/mongo/base", - "$BUILD_DIR/mongo/db/repl/replica_set_messages", - "$BUILD_DIR/mongo/rpc/protocol", - "$BUILD_DIR/mongo/util/net/network", - ], -) - envWithAsio = env.Clone() envWithAsio.InjectThirdParty(libraries=['asio']) @@ -1926,7 +1918,7 @@ envWithAsio.CppUnitTest( 'logical_session_id', 'logical_session_id_helpers', 'logical_time', - 'mirroring_sampler', + 'mirror_maestro', 'namespace_string', 'op_observer', 'op_observer_impl', diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index 433bd033588..142037458bb 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -690,9 +690,26 @@ void CommandHelpers::handleMarkKillOnClientDisconnect(OperationContext* opCtx, }); } +namespace { +// We store the CommandInvocation as a shared_ptr on the OperationContext in case we need to persist +// the invocation past the lifetime of the op. If so, this shared_ptr can be copied off to another +// thread. If not, there is only one shared_ptr and the invocation goes out of scope when the op +// ends. +auto invocationForOpCtx = OperationContext::declareDecoration<std::shared_ptr<CommandInvocation>>(); +} // namespace + ////////////////////////////////////////////////////////////// // CommandInvocation +void CommandInvocation::set(OperationContext* opCtx, + std::shared_ptr<CommandInvocation> invocation) { + invocationForOpCtx(opCtx) = std::move(invocation); +} + +std::shared_ptr<CommandInvocation> CommandInvocation::get(OperationContext* opCtx) { + return invocationForOpCtx(opCtx); +} + CommandInvocation::~CommandInvocation() = default; void CommandInvocation::checkAuthorization(OperationContext* opCtx, @@ -737,13 +754,13 @@ public: BasicCommandWithReplyBuilderInterface* command) : CommandInvocation(command), _command(command), - _request(&request), - _dbName(_request->getDatabase().toString()) {} + _request(request), + _dbName(_request.getDatabase().toString()) {} private: void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override { - opCtx->lockState()->setDebugInfo(redact(_request->body).toString()); - bool ok = _command->runWithReplyBuilder(opCtx, _dbName, _request->body, result); + opCtx->lockState()->setDebugInfo(redact(_request.body).toString()); + bool ok = _command->runWithReplyBuilder(opCtx, _dbName, _request.body, result); if (!ok) { BSONObjBuilder bob = result->getBodyBuilder(); CommandHelpers::appendSimpleCommandStatus(bob, ok); @@ -753,7 +770,7 @@ private: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) override { - uassertStatusOK(_command->explain(opCtx, *_request, verbosity, result)); + uassertStatusOK(_command->explain(opCtx, _request, verbosity, result)); } NamespaceString ns() const override { @@ -768,6 +785,15 @@ private: return _command->supportsReadConcern(cmdObj(), level); } + bool supportsReadMirroring() const override { + return _command->supportsReadMirroring(cmdObj()); + } + + void appendMirrorableRequest(BSONObjBuilder* bob) const override { + invariant(cmdObj().isOwned()); + _command->appendMirrorableRequest(bob, cmdObj()); + } + bool allowsAfterClusterTime() const override { return _command->allowsAfterClusterTime(cmdObj()); } @@ -778,15 +804,15 @@ private: void doCheckAuthorization(OperationContext* opCtx) const override { uassertStatusOK(_command->checkAuthForOperation( - opCtx, _request->getDatabase().toString(), _request->body)); + opCtx, _request.getDatabase().toString(), _request.body)); } const BSONObj& cmdObj() const { - return _request->body; + return _request.body; } BasicCommandWithReplyBuilderInterface* const _command; - const OpMsgRequest* const _request; + const OpMsgRequest _request; const std::string _dbName; }; diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index 3eff8eb062d..a2ea871d67b 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -514,6 +514,9 @@ public: virtual ~CommandInvocation(); + static void set(OperationContext* opCtx, std::shared_ptr<CommandInvocation> invocation); + static std::shared_ptr<CommandInvocation> get(OperationContext* opCtx); + /** * Runs the command, filling in result. Any exception thrown from here will cause result * to be reset and filled in with the error. Non-const to permit modifying the request @@ -555,13 +558,20 @@ public: } /** - * Returns this invocation's support for readMirroring. + * Return if this invocation can be mirrored to secondaries */ virtual bool supportsReadMirroring() const { return false; } /** + * Return a BSONObj that can be safely mirrored to secondaries for cache warming + */ + virtual void appendMirrorableRequest(BSONObjBuilder*) const { + MONGO_UNREACHABLE; + } + + /** * Returns true if command allows afterClusterTime in its readConcern. The command may not allow * it if it is specifically intended not to take any LockManager locks. Waiting for * afterClusterTime takes the MODE_IS lock. @@ -713,6 +723,20 @@ public: {kDefaultReadConcernNotPermitted}}; } + /** + * Return if the cmdObj can be mirrored to secondaries in some form + */ + virtual bool supportsReadMirroring(const BSONObj& cmdObj) const { + return false; + } + + /** + * Return a modified form of cmdObj that can be safely mirrored to secondaries for cache warming + */ + virtual void appendMirrorableRequest(BSONObjBuilder*, const BSONObj&) const { + MONGO_UNREACHABLE; + } + virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const { return true; } diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 14cdb0c0f98..2f2a7c4b09d 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -94,6 +94,10 @@ public: return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } + bool supportsReadMirroring(const BSONObj&) const override { + return true; + } + ReadWriteType getReadWriteType() const override { return ReadWriteType::kRead; } @@ -261,6 +265,24 @@ public: return true; } + void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override { + static const auto kMirrorableKeys = [] { + BSONObjBuilder keyBob; + + keyBob.append("count", 1); + keyBob.append("query", 1); + keyBob.append("skip", 1); + keyBob.append("limit", 1); + keyBob.append("hint", 1); + keyBob.append("collation", 1); + + return keyBob.obj(); + }(); + + // Filter the keys that can be mirrored + cmdObj.filterFieldsUndotted(bob, kMirrorableKeys, true); + } + } cmdCount; } // namespace diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index f580846dc0b..cbbaa62d243 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -93,6 +93,10 @@ public: return ReadConcernSupportResult::allSupportedAndDefaultPermitted(); } + bool supportsReadMirroring(const BSONObj&) const override { + return true; + } + ReadWriteType getReadWriteType() const override { return ReadWriteType::kRead; } @@ -310,6 +314,21 @@ public: return true; } + void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override { + static const auto kMirrorableKeys = [] { + BSONObjBuilder keyBob; + keyBob.append("distinct", 1); + keyBob.append("key", 1); + keyBob.append("query", 1); + keyBob.append("collation", 1); + return keyBob.obj(); + }(); + + // Filter the keys that can be mirrored + cmdObj.filterFieldsUndotted(bob, kMirrorableKeys, true); + } + + } distinctCmd; } // namespace diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 02f1c8152ed..5d995f1c70e 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -229,6 +229,10 @@ public: return true; } + bool supportsReadMirroring(const BSONObj&) const override { + return true; + } + void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector<Privilege>* out) const override { @@ -527,6 +531,25 @@ public: }); } + void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override { + // Filter the keys that can be mirrored + static const auto kMirrorableKeys = [] { + BSONObjBuilder keyBob; + keyBob.append("sort", 1); + keyBob.append("collation", 1); + return keyBob.obj(); + }(); + + bob->append("find", cmdObj.firstElement().String()); + bob->append("filter", cmdObj["query"].Obj()); + + cmdObj.filterFieldsUndotted(bob, kMirrorableKeys, true); + + // Prevent the find from returning multiple documents since we can + bob->append("batchSize", 1); + bob->append("singleBatch", true); + } + } cmdFindAndModify; } // namespace diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 0bb4abdccaa..5d5fcca7e8f 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -182,7 +182,9 @@ public: class Invocation final : public CommandInvocation { public: Invocation(const FindCmd* definition, const OpMsgRequest& request, StringData dbName) - : CommandInvocation(definition), _request(request), _dbName(dbName) {} + : CommandInvocation(definition), _request(request), _dbName(dbName) { + invariant(_request.body.isOwned()); + } private: bool supportsWriteConcern() const override { @@ -592,8 +594,31 @@ public: firstBatch.done(cursorId, nss.ns()); } + void appendMirrorableRequest(BSONObjBuilder* bob) const override { + // Filter the keys that can be mirrored + static const auto kMirrorableKeys = [] { + BSONObjBuilder keyBob; + keyBob.append("find", 1); + keyBob.append("filter", 1); + keyBob.append("skip", 1); + keyBob.append("limit", 1); + keyBob.append("sort", 1); + keyBob.append("hint", 1); + keyBob.append("collation", 1); + keyBob.append("min", 1); + keyBob.append("max", 1); + return keyBob.obj(); + }(); + + _request.body.filterFieldsUndotted(bob, kMirrorableKeys, true); + + // Tell the find to only return a single batch + bob->append("batchSize", 1); + bob->append("singleBatch", true); + } + private: - const OpMsgRequest& _request; + const OpMsgRequest _request; const StringData _dbName; }; diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 886592fe567..e79178ed28c 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -277,13 +277,10 @@ void initWireSpec() { void initializeCommandHooks(ServiceContext* serviceContext) { class MongodCommandInvocationHooks final : public CommandInvocationHooks { - void onBeforeRun(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation) {} - void onAfterRun(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation) { - MirrorMaestro::tryMirror(opCtx, request, invocation); + void onBeforeRun(OperationContext*, const OpMsgRequest&, CommandInvocation*) {} + + void onAfterRun(OperationContext* opCtx, const OpMsgRequest&, CommandInvocation*) { + MirrorMaestro::tryMirrorRequest(opCtx); } }; diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp index a31bd43dd30..ce05f13fac3 100644 --- a/src/mongo/db/mirror_maestro.cpp +++ b/src/mongo/db/mirror_maestro.cpp @@ -40,10 +40,15 @@ #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/json.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/server_status.h" #include "mongo/db/mirror_maestro_gen.h" +#include "mongo/db/mirroring_sampler.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/topology_version_observer.h" +#include "mongo/executor/connection_pool.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -53,6 +58,21 @@ namespace mongo { +namespace { +constexpr auto kMirrorMaestroName = "MirrorMaestro"_sd; +constexpr auto kMirrorMaestroThreadPoolMaxThreads = 2ull; // Just enough to allow concurrency +constexpr auto kMirrorMaestroConnPoolMinSize = 1ull; // Always be able to mirror eventually +constexpr auto kMirrorMaestroConnPoolMaxSize = 4ull; // Never use more than a handful + +constexpr auto kMirroredReadsParamName = "mirrorReads"_sd; + +constexpr auto kMirroredReadsName = "mirroredReads"_sd; +constexpr auto kMirroredReadsSeenKey = "seen"_sd; +constexpr auto kMirroredReadsSentKey = "sent"_sd; +constexpr auto kMirroredReadsResolvedKey = "resolved"_sd; + +MONGO_FAIL_POINT_DEFINE(mirrorMaestroExpectsResponse); + class MirrorMaestroImpl { public: /** @@ -68,12 +88,17 @@ public: /** * Mirror only if this maestro has been initialized */ - void tryMirror(OperationContext* opCtx, - const OpMsgRequest& request, - const CommandInvocation* invocation) noexcept; + void tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept; private: /** + * Attempt to mirror invocation to a subset of hosts based on params + * + * This command is expected to only run on the _executor + */ + void _mirror(std::shared_ptr<CommandInvocation> invocation, MirroredReadsParameters params); + + /** * An enum detailing the liveness of the Maestro * * The state transition map for liveness looks like so: @@ -99,15 +124,47 @@ private: // inately thread safe. If _isInitialized is false, there may not even be correct pointers to // call member functions upon. AtomicWord<bool> _isInitialized; + MirroredReadsServerParameter* _params = nullptr; std::shared_ptr<executor::TaskExecutor> _executor; + repl::TopologyVersionObserver _topologyVersionObserver; }; +const auto getMirrorMaestroImpl = ServiceContext::declareDecoration<MirrorMaestroImpl>(); -namespace { -constexpr auto kMirrorMaestroName = "MirrorMaestro"_sd; -constexpr auto kMirrorMaestroThreadPoolMaxThreads = 2ull; +// Define a new serverStatus section "mirroredReads" +class MirroredReadsSection final : public ServerStatusSection { +public: + using CounterT = long long; + + MirroredReadsSection() : ServerStatusSection(kMirroredReadsName.toString()) {} + + bool includeByDefault() const override { + return false; + } + + BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override { + BSONObjBuilder section; + section.append(kMirroredReadsSeenKey, seen.loadRelaxed()); + section.append(kMirroredReadsSentKey, sent.loadRelaxed()); + + if (MONGO_unlikely(mirrorMaestroExpectsResponse.shouldFail())) { + // We only can see if the command resolved if we got a response + section.append(kMirroredReadsResolvedKey, resolved.loadRelaxed()); + } + + return section.obj(); + }; + + AtomicWord<CounterT> seen; + AtomicWord<CounterT> sent; + AtomicWord<CounterT> resolved; +} gMirroredReadsSection; + +auto parseMirroredReadsParameters(const BSONObj& obj) { + IDLParserErrorContext ctx("mirrorReads"); + return MirroredReadsParameters::parse(ctx, obj); +} -const auto getMirrorMaestroImpl = ServiceContext::declareDecoration<MirrorMaestroImpl>(); } // namespace void MirroredReadsServerParameter::append(OperationContext*, @@ -120,18 +177,21 @@ void MirroredReadsServerParameter::append(OperationContext*, Status MirroredReadsServerParameter::set(const BSONElement& value) try { auto obj = value.Obj(); - IDLParserErrorContext ctx(name()); - _data = MirroredReadsParameters::parse(ctx, obj); + _data = parseMirroredReadsParameters(obj); return Status::OK(); } catch (const AssertionException& e) { return e.toStatus(); } -Status MirroredReadsServerParameter::setFromString(const std::string&) { - using namespace fmt::literals; - auto msg = "{:s} cannot be set from a string."_format(name()); - return {ErrorCodes::BadValue, msg}; +Status MirroredReadsServerParameter::setFromString(const std::string& str) try { + auto obj = fromjson(str); + + _data = parseMirroredReadsParameters(obj); + + return Status::OK(); +} catch (const AssertionException& e) { + return e.toStatus(); } void MirrorMaestro::init(ServiceContext* serviceContext) noexcept { @@ -151,35 +211,110 @@ void MirrorMaestro::shutdown(ServiceContext* serviceContext) noexcept { impl.shutdown(); } -void MirrorMaestro::tryMirror(OperationContext* opCtx, - const OpMsgRequest& request, - const CommandInvocation* invocation) noexcept { +void MirrorMaestro::tryMirrorRequest(OperationContext* opCtx) noexcept { auto& impl = getMirrorMaestroImpl(opCtx->getServiceContext()); - impl.tryMirror(opCtx, request, invocation); + + auto invocation = CommandInvocation::get(opCtx); + + impl.tryMirror(std::move(invocation)); } -void MirrorMaestroImpl::tryMirror(OperationContext* opCtx, - const OpMsgRequest& request, - const CommandInvocation* invocation) noexcept { +void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept { if (!_isInitialized.load()) { // If we're not even available, nothing to do return; } + invariant(invocation); if (!invocation->supportsReadMirroring()) { // That's all, folks return; } - // TODO SERVER-45816 will add the sampling function and attach the command - repl::IsMasterResponse* imr = nullptr; + gMirroredReadsSection.seen.fetchAndAdd(1); + + auto params = _params->_data.get(); + if (params.getSamplingRate() <= 0.0) { + // We'll never need to sample + return; + } + + // There is the potential to actually mirror requests, so schedule the _mirror() invocation + // out-of-line. This means the command itself can return quickly and we do the arduous work of + // building new bsons and evaluating randomness in a less important context. + ExecutorFuture(_executor) // + .getAsync([this, invocation = std::move(invocation), params = std::move(params)]( + const auto& status) mutable { + invariant(status.isOK()); + + _mirror(std::move(invocation), std::move(params)); + }); +} + +void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation, + MirroredReadsParameters params) { + auto imr = _topologyVersionObserver.getCached(); if (!imr) { // If we don't have an IsMasterResponse, we can't know where to send our mirrored // request return; } - MONGO_UNREACHABLE; + auto hosts = MirroringSampler::getMirroringTargets(imr, params.getSamplingRate()); + if (hosts.empty()) { + return; + } + + auto payload = [&] { + BSONObjBuilder bob; + + invocation->appendMirrorableRequest(&bob); + + // Limit the maxTimeMS + bob.append("maxTimeMS", params.getMaxTimeMS()); + + { + // Set secondary read preference + BSONObjBuilder rpBob = bob.subobjStart("$readPreference"); + rpBob.append("mode", "secondary"); + } + return bob.obj(); + }(); + + for (auto& host : hosts) { + static const auto mirrorResponseCallback = [host](auto& args) { + if (MONGO_likely(!mirrorMaestroExpectsResponse.shouldFail())) { + // If we don't expect responses, then there is nothing to do here + return; + } + + if (args.response.isOK()) { + gMirroredReadsSection.resolved.fetchAndAdd(1); + LOGV2_DEBUG(31457, + 4, + "Received response from {host}, response: {response}", + "host"_attr = host, + "response"_attr = args.response); + } + }; + + auto newRequest = executor::RemoteCommandRequest( + host, invocation->ns().db().toString(), payload, nullptr); + LOGV2_DEBUG(31455, + 4, + "Mirroring to {host}, request: {request}", + "host"_attr = host, + "request"_attr = newRequest); + auto status = + _executor->scheduleRemoteCommand(newRequest, mirrorResponseCallback).getStatus(); + if (!status.isOK()) { + LOGV2_DEBUG( + 31456, 2, "Failed to mirror read command due to {error}", "error"_attr = status); + continue; + } + + gMirroredReadsSection.sent.fetchAndAdd(1); + } } void MirrorMaestroImpl::init(ServiceContext* serviceContext) noexcept { @@ -203,7 +338,13 @@ void MirrorMaestroImpl::init(ServiceContext* serviceContext) noexcept { } break; }; - auto makeNet = [&] { return executor::makeNetworkInterface(kMirrorMaestroName.toString()); }; + auto makeNet = [&] { + executor::ConnectionPool::Options options; + options.minConnections = kMirrorMaestroConnPoolMinSize; + options.maxConnections = kMirrorMaestroConnPoolMaxSize; + return executor::makeNetworkInterface( + kMirrorMaestroName.toString(), {}, {}, std::move(options)); + }; auto makePool = [&] { ThreadPool::Options options; @@ -214,12 +355,17 @@ void MirrorMaestroImpl::init(ServiceContext* serviceContext) noexcept { _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(makePool(), makeNet()); _executor->startup(); + _topologyVersionObserver.init(serviceContext); + + _params = + ServerParameterSet::getGlobal()->get<MirroredReadsServerParameter>(kMirroredReadsParamName); + invariant(_params); // Set _initGuard.liveness to kRunning _initGuard.liveness = Liveness::kRunning; - // Mark the maestro as initialized. It is now safe to call tryMirror(), use the _executor, or - // otherwise rely on members to be alive and well. + // Mark the maestro as initialized. It is now safe to call tryMirrorRequest(), use the + // _executor, or otherwise rely on members to be alive and well. _isInitialized.store(true); } @@ -240,6 +386,8 @@ void MirrorMaestroImpl::shutdown() noexcept { } break; }; + _topologyVersionObserver.shutdown(); + if (_executor) { _executor->shutdown(); } diff --git a/src/mongo/db/mirror_maestro.h b/src/mongo/db/mirror_maestro.h index da99be92787..f595fd0bcc9 100644 --- a/src/mongo/db/mirror_maestro.h +++ b/src/mongo/db/mirror_maestro.h @@ -65,14 +65,12 @@ public: static void shutdown(ServiceContext* serviceContext) noexcept; /** - * Check if a given invocation+request should be mirrored to secondaries, and schedule that work - * if so. + * Check if the request associated with opCtx should be mirrored to secondaries, and schedule + * that work if so. * * This function will noop if the MirrorMaestro is currently being initialized or shutdown. */ - static void tryMirror(OperationContext* opCtx, - const OpMsgRequest& request, - const CommandInvocation* invocation) noexcept; + static void tryMirrorRequest(OperationContext* opCtx) noexcept; }; } // namespace mongo diff --git a/src/mongo/db/mirror_maestro.idl b/src/mongo/db/mirror_maestro.idl index aa058107391..d486f5aba97 100644 --- a/src/mongo/db/mirror_maestro.idl +++ b/src/mongo/db/mirror_maestro.idl @@ -46,6 +46,13 @@ structs: validator: gte: 0.0 lte: 1.0 + maxTimeMS: + description: >- + The maxTimeMS to apply to mirrored reads + type: int + default: 1000 + validator: + gt: 0 server_parameters: mirrorReads: diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index 48d8f49b519..df713b26676 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -142,7 +142,7 @@ string optionString(size_t options) { } static BSONObj getKeyFromQuery(const BSONObj& keyPattern, const BSONObj& query) { - return query.extractFieldsUnDotted(keyPattern); + return query.extractFieldsUndotted(keyPattern); } static bool indexCompatibleMaxMin(const BSONObj& obj, diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp index 9d7cef07fd2..635801a0fb7 100644 --- a/src/mongo/db/repl/topology_version_observer.cpp +++ b/src/mongo/db/repl/topology_version_observer.cpp @@ -40,62 +40,83 @@ namespace mongo { namespace repl { -void TopologyVersionObserver::init(ReplicationCoordinator* replCoordinator) noexcept { - invariant(replCoordinator); - stdx::lock_guard<Mutex> lk(_mutex); - invariant(_state.load() == State::kUninitialized); - +void TopologyVersionObserver::init(ServiceContext* serviceContext, + ReplicationCoordinator* replCoordinator) noexcept { LOGV2_INFO(40440, "Starting {topologyVersionObserverName}", "topologyVersionObserverName"_attr = toString()); - _replCoordinator = replCoordinator; + + stdx::unique_lock lk(_mutex); + + _serviceContext = serviceContext; + invariant(_serviceContext); + + _replCoordinator = + replCoordinator ? replCoordinator : ReplicationCoordinator::get(_serviceContext); + invariant(_replCoordinator); invariant(!_thread); + invariant(_state.load() == State::kUninitialized); _thread = stdx::thread([&]() { this->_workerThreadBody(); }); - // Wait for the observer thread to update the status. - while (_state.load() != State::kRunning) { - } + _cv.wait(lk, [&] { return _state.load() != State::kUninitialized; }); } void TopologyVersionObserver::shutdown() noexcept { - stdx::unique_lock<Mutex> lk(_mutex); - if (_state.load() == State::kUninitialized) { - return; - } + auto shouldWaitForShutdown = _shouldShutdown.swap(true); + if (shouldWaitForShutdown) { + // If we aren't the first ones to call shutdown, wait for the thread to stop + stdx::unique_lock lk(_mutex); - // Check if another `shutdown()` has already completed. - if (!_thread) { + _cv.wait(lk, [&] { return _state.load() == State::kShutdown; }); + invariant(_state.load() == State::kShutdown); return; } LOGV2_INFO(40441, "Stopping {topologyVersionObserverName}", "topologyVersionObserverName"_attr = toString()); - auto state = _state.load(); - invariant(state == State::kRunning || state == State::kShutdown); - - // Wait for the observer client to exit from its main loop. - // Observer thread must update the state before attempting to acquire the mutex. - while (_state.load() == State::kRunning) { - invariant(_observerClient); - _observerClient->lock(); - auto opCtx = _observerClient->getOperationContext(); - if (opCtx) { - opCtx->markKilled(ErrorCodes::ShutdownInProgress); - } - _observerClient->unlock(); - } - invariant(_state.load() == State::kShutdown); - auto thread = std::exchange(_thread, boost::none); - lk.unlock(); + // Wait for the thread to stop and steal it to the local stack + auto thread = [&] { + stdx::unique_lock lk(_mutex); + + _cv.wait(lk, [&] { + if (_state.load() != State::kRunning) { + // If we are no longer running, then we can safely join + return true; + } + + // If we are still running, attempt to kill any opCtx + invariant(_observerClient); + stdx::lock_guard clientLk(*_observerClient); + auto opCtx = _observerClient->getOperationContext(); + if (opCtx) { + _serviceContext->killOperation(clientLk, opCtx, ErrorCodes::ShutdownInProgress); + } + + return false; + }); + invariant(_state.load() == State::kShutdown); + + return std::exchange(_thread, boost::none); + }(); + + if (!thread) { + // We never started + return; + } - invariant(thread); + // Finally join thread->join(); } std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::getCached() noexcept { + if (_state.load() != State::kRunning || _shouldShutdown.load()) { + // Early return if we know there isn't a worker + return {}; + } + // Acquires the lock to avoid potential races with `_workerThreadBody()`. // Atomics cannot be used here as `shared_ptr` cannot be atomically updated. stdx::lock_guard<Mutex> lk(_mutex); @@ -107,7 +128,7 @@ std::string TopologyVersionObserver::toString() const { } std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterResponse( - boost::optional<TopologyVersion> topologyVersion, bool* shouldShutdown) try { + boost::optional<TopologyVersion> topologyVersion, bool* shouldShutdown) noexcept try { invariant(*shouldShutdown == false); ServiceContext::UniqueOperationContext opCtx; try { @@ -120,6 +141,8 @@ std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterRes } invariant(opCtx); + + invariant(_replCoordinator); auto future = _replCoordinator->getIsMasterResponseFuture({}, topologyVersion); auto response = future.get(opCtx.get()); if (!response->isConfigSet()) { @@ -140,17 +163,10 @@ std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterRes } void TopologyVersionObserver::_workerThreadBody() noexcept { - invariant(_state.load() == State::kUninitialized); - // Creates a new client and makes `_observerClient` to point to it, which allows `shutdown()` // to access the client object. - Client::initThread(kTopologyVersionObserverName); - _observerClient = Client::getCurrent(); - - // `init()` must hold the mutex until the observer updates the state. - invariant(!_mutex.try_lock()); - // The following notifies `init()` that `_observerClient` is set and ready to use. - _state.store(State::kRunning); + invariant(_serviceContext); + ThreadClient tc(kTopologyVersionObserverName, _serviceContext); auto getTopologyVersion = [&]() -> boost::optional<TopologyVersion> { // Only the observer thread updates `_cache`, thus there is no need to hold the lock before @@ -161,37 +177,54 @@ void TopologyVersionObserver::_workerThreadBody() noexcept { return boost::none; }; - ON_BLOCK_EXIT([&] { - // Once the observer detects a shutdown, it must update the state first before attempting - // to acquire the lock. This is necessary to avoid deadlocks. - invariant(_state.load() == State::kRunning); - _state.store(State::kShutdown); + LOGV2_INFO(40445, + "Started {topologyVersionObserverName}", + "topologyVersionObserverName"_attr = toString()); - stdx::unique_lock lock(_mutex); + { + stdx::lock_guard lk(_mutex); + invariant(_state.load() == State::kUninitialized); + if (_shouldShutdown.load()) { + _state.store(State::kShutdown); + _cv.notify_all(); - // Invalidate the cache as it is no longer updated - _cache.reset(); + return; + } + + // The following notifies `init()` that `_observerClient` is set and ready to use. + _state.store(State::kRunning); + _observerClient = tc.get(); + _cv.notify_all(); + } + + ON_BLOCK_EXIT([&] { + { + stdx::lock_guard lk(_mutex); + invariant(_state.load() == State::kRunning); + _state.store(State::kShutdown); - // Client object is local to this thread, and is no longer be available. - _observerClient = nullptr; + // Invalidate the cache as it is no longer updated + _cache.reset(); + _observerClient = nullptr; + + _cv.notify_all(); + } LOGV2_INFO(40447, "Stopped {topologyVersionObserverName}", "topologyVersionObserverName"_attr = toString()); }); - bool shouldShutdown = false; - LOGV2_INFO(40445, - "Started {topologyVersionObserverName}", - "topologyVersionObserverName"_attr = toString()); - while (!shouldShutdown) { - auto response = _getIsMasterResponse(getTopologyVersion(), &shouldShutdown); - // Only update if the version is more recent than the cached version, or `_cache` is null. - if (!shouldShutdown && response != _cache) { - stdx::lock_guard lock(_mutex); - _cache = response; - } - } + bool receivedShutdownError; + do { + receivedShutdownError = false; + auto response = _getIsMasterResponse(getTopologyVersion(), &receivedShutdownError); + + stdx::lock_guard lk(_mutex); + _cache = response; + + // If either the global shutdown flag was set or we received a shutdown error, we're done + } while (!(receivedShutdownError || _shouldShutdown.load())); } } // namespace repl diff --git a/src/mongo/db/repl/topology_version_observer.h b/src/mongo/db/repl/topology_version_observer.h index afc55321ab2..908535194f0 100644 --- a/src/mongo/db/repl/topology_version_observer.h +++ b/src/mongo/db/repl/topology_version_observer.h @@ -69,22 +69,15 @@ constexpr auto kTopologyVersionObserverName = "TopologyVersionObserver"; */ class TopologyVersionObserver final { public: - TopologyVersionObserver() {} - - TopologyVersionObserver(const TopologyVersionObserver&) = delete; - - TopologyVersionObserver(TopologyVersionObserver&&) noexcept = delete; - - TopologyVersionObserver& operator=(const TopologyVersionObserver&) = delete; - - TopologyVersionObserver& operator=(TopologyVersionObserver&&) = delete; + TopologyVersionObserver() = default; ~TopologyVersionObserver() { auto state = _state.load(); invariant(state == State::kShutdown || state == State::kUninitialized); } - void init(ReplicationCoordinator* replCoordinator) noexcept; + void init(ServiceContext* serviceContext, + ReplicationCoordinator* replCoordinator = nullptr) noexcept; void shutdown() noexcept; @@ -106,7 +99,7 @@ private: }; std::shared_ptr<const IsMasterResponse> _getIsMasterResponse(boost::optional<TopologyVersion>, - bool*); + bool*) noexcept; void _workerThreadBody() noexcept; @@ -119,22 +112,33 @@ private: * is that the contention on this lock is insignificant. */ mutable Mutex _mutex = MONGO_MAKE_LATCH(kTopologyVersionObserverName); + stdx::condition_variable _cv; + + /** + * Tells the worker thread if it should continue to run + * + * This variable is set to true from false outside the worker thread + */ + AtomicWord<bool> _shouldShutdown; // The reference to the latest cached version of `IsMasterResponse` std::shared_ptr<const IsMasterResponse> _cache; - // Holds a reference to the observer client to allow `shutdown()` to stop the observer thread. - Client* _observerClient; - /** * Represents the current state of the observer. - * Note that the observer thread never updates the state. + * + * This variable is only changed from the worker thread */ AtomicWord<State> _state; + // Holds a reference to the observer client to allow `shutdown()` to stop the observer thread. + // This variable is only consistent when _state == State::kRunning and _mutex is acquired. + Client* _observerClient; + boost::optional<stdx::thread> _thread; - ReplicationCoordinator* _replCoordinator; + ServiceContext* _serviceContext = nullptr; + ReplicationCoordinator* _replCoordinator = nullptr; }; } // namespace repl diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp index 81532419f16..ab60f0d03e7 100644 --- a/src/mongo/db/repl/topology_version_observer_test.cpp +++ b/src/mongo/db/repl/topology_version_observer_test.cpp @@ -87,8 +87,9 @@ public: getNet()->advanceTime(Date_t::now() + sleepTime); getNet()->exitNetwork(); + auto serviceContext = getServiceContext(); observer = std::make_unique<TopologyVersionObserver>(); - observer->init(replCoord); + observer->init(serviceContext, replCoord); } ~TopologyVersionObserverTest() { diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index db2956f0dbe..8878ff28f60 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -866,7 +866,8 @@ void execCommandDatabase(OperationContext* opCtx, BSONObjBuilder extraFieldsBuilder; auto startOperationTime = getClientOperationTime(opCtx); - auto invocation = command->parse(opCtx, request); + std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request); + CommandInvocation::set(opCtx, invocation); OperationSessionInfoFromClient sessionOptions; diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 3cb5015ff63..045cba37e40 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -364,7 +364,8 @@ void runCommand(OperationContext* opCtx, opCtx->setComment(commentField.wrap()); } - auto invocation = command->parse(opCtx, request); + std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request); + CommandInvocation::set(opCtx, invocation); // Set the logical optype, command object and namespace as soon as we identify the command. If // the command does not define a fully-qualified namespace, set CurOp to the generic command |