summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-02-18 18:25:00 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 02:18:39 +0000
commit74961aa50ded11df8612bed9e0bd40f6c9466560 (patch)
tree933de51e64dae1d5b66bfbfd1e5ddf9e978beb07 /src
parent679d1e1028ddbdd1ac5778f3e0ab0cb3ffd5ee27 (diff)
downloadmongo-74961aa50ded11df8612bed9e0bd40f6c9466560.tar.gz
SERVER-46002 Attach core read mirroring functionality
Diffstat (limited to 'src')
-rw-r--r--src/mongo/bson/bsonobj.cpp20
-rw-r--r--src/mongo/bson/bsonobj.h5
-rw-r--r--src/mongo/db/SConscript22
-rw-r--r--src/mongo/db/commands.cpp42
-rw-r--r--src/mongo/db/commands.h26
-rw-r--r--src/mongo/db/commands/count_cmd.cpp22
-rw-r--r--src/mongo/db/commands/distinct.cpp19
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp23
-rw-r--r--src/mongo/db/commands/find_cmd.cpp29
-rw-r--r--src/mongo/db/db.cpp11
-rw-r--r--src/mongo/db/mirror_maestro.cpp200
-rw-r--r--src/mongo/db/mirror_maestro.h8
-rw-r--r--src/mongo/db/mirror_maestro.idl7
-rw-r--r--src/mongo/db/query/query_planner.cpp2
-rw-r--r--src/mongo/db/repl/topology_version_observer.cpp163
-rw-r--r--src/mongo/db/repl/topology_version_observer.h36
-rw-r--r--src/mongo/db/repl/topology_version_observer_test.cpp3
-rw-r--r--src/mongo/db/service_entry_point_common.cpp3
-rw-r--r--src/mongo/s/commands/strategy.cpp3
19 files changed, 488 insertions, 156 deletions
diff --git a/src/mongo/bson/bsonobj.cpp b/src/mongo/bson/bsonobj.cpp
index 74d3edf1588..284058feecf 100644
--- a/src/mongo/bson/bsonobj.cpp
+++ b/src/mongo/bson/bsonobj.cpp
@@ -346,8 +346,7 @@ bool BSONObj::isFieldNamePrefixOf(const BSONObj& otherObj) const {
return !a.more();
}
-BSONObj BSONObj::extractFieldsUnDotted(const BSONObj& pattern) const {
- BSONObjBuilder b;
+void BSONObj::extractFieldsUndotted(BSONObjBuilder* b, const BSONObj& pattern) const {
BSONObjIterator i(pattern);
while (i.moreWithEOO()) {
BSONElement e = i.next();
@@ -355,13 +354,17 @@ BSONObj BSONObj::extractFieldsUnDotted(const BSONObj& pattern) const {
break;
BSONElement x = getField(e.fieldName());
if (!x.eoo())
- b.appendAs(x, "");
+ b->appendAs(x, "");
}
- return b.obj();
}
-BSONObj BSONObj::filterFieldsUndotted(const BSONObj& filter, bool inFilter) const {
+BSONObj BSONObj::extractFieldsUndotted(const BSONObj& pattern) const {
BSONObjBuilder b;
+ extractFieldsUndotted(&b, pattern);
+ return b.obj();
+}
+
+void BSONObj::filterFieldsUndotted(BSONObjBuilder* b, const BSONObj& filter, bool inFilter) const {
BSONObjIterator i(*this);
while (i.moreWithEOO()) {
BSONElement e = i.next();
@@ -369,8 +372,13 @@ BSONObj BSONObj::filterFieldsUndotted(const BSONObj& filter, bool inFilter) cons
break;
BSONElement x = filter.getField(e.fieldName());
if ((x.eoo() && !inFilter) || (!x.eoo() && inFilter))
- b.append(e);
+ b->append(e);
}
+}
+
+BSONObj BSONObj::filterFieldsUndotted(const BSONObj& filter, bool inFilter) const {
+ BSONObjBuilder b;
+ filterFieldsUndotted(&b, filter, inFilter);
return b.obj();
}
diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h
index 215652bcd29..f22c9391bb6 100644
--- a/src/mongo/bson/bsonobj.h
+++ b/src/mongo/bson/bsonobj.h
@@ -52,6 +52,7 @@
namespace mongo {
+class BSONObjBuilder;
class BSONObjStlIterator;
class ExtendedCanonicalV200Generator;
class ExtendedRelaxedV200Generator;
@@ -393,9 +394,11 @@ public:
* this.extractFieldsUnDotted({b : "blah"}) -> {"" : 5}
*
*/
- BSONObj extractFieldsUnDotted(const BSONObj& pattern) const;
+ BSONObj extractFieldsUndotted(const BSONObj& pattern) const;
+ void extractFieldsUndotted(BSONObjBuilder* b, const BSONObj& pattern) const;
BSONObj filterFieldsUndotted(const BSONObj& filter, bool inFilter) const;
+ void filterFieldsUndotted(BSONObjBuilder* b, const BSONObj& filter, bool inFilter) const;
BSONElement getFieldUsingIndexNames(StringData fieldName, const BSONObj& indexKey) const;
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 193a759b5a5..f959c2520ae 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -488,20 +488,25 @@ env.Library(
source=[
'mirror_maestro.cpp',
env.Idlc('mirror_maestro.idl')[0],
+ 'mirroring_sampler.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/executor/task_executor_interface',
+ "$BUILD_DIR/mongo/rpc/protocol",
'service_context',
+ "$BUILD_DIR/mongo/util/net/network",
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ 'commands/server_status',
'repl/replica_set_messages',
'repl/repl_coordinator_interface',
- '$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ 'repl/topology_version_observer',
],
)
@@ -1824,19 +1829,6 @@ env.Library(
],
)
-env.Library(
- target='mirroring_sampler',
- source=[
- 'mirroring_sampler.cpp',
- ],
- LIBDEPS=[
- "$BUILD_DIR/mongo/base",
- "$BUILD_DIR/mongo/db/repl/replica_set_messages",
- "$BUILD_DIR/mongo/rpc/protocol",
- "$BUILD_DIR/mongo/util/net/network",
- ],
-)
-
envWithAsio = env.Clone()
envWithAsio.InjectThirdParty(libraries=['asio'])
@@ -1926,7 +1918,7 @@ envWithAsio.CppUnitTest(
'logical_session_id',
'logical_session_id_helpers',
'logical_time',
- 'mirroring_sampler',
+ 'mirror_maestro',
'namespace_string',
'op_observer',
'op_observer_impl',
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index 433bd033588..142037458bb 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -690,9 +690,26 @@ void CommandHelpers::handleMarkKillOnClientDisconnect(OperationContext* opCtx,
});
}
+namespace {
+// We store the CommandInvocation as a shared_ptr on the OperationContext in case we need to persist
+// the invocation past the lifetime of the op. If so, this shared_ptr can be copied off to another
+// thread. If not, there is only one shared_ptr and the invocation goes out of scope when the op
+// ends.
+auto invocationForOpCtx = OperationContext::declareDecoration<std::shared_ptr<CommandInvocation>>();
+} // namespace
+
//////////////////////////////////////////////////////////////
// CommandInvocation
+void CommandInvocation::set(OperationContext* opCtx,
+ std::shared_ptr<CommandInvocation> invocation) {
+ invocationForOpCtx(opCtx) = std::move(invocation);
+}
+
+std::shared_ptr<CommandInvocation> CommandInvocation::get(OperationContext* opCtx) {
+ return invocationForOpCtx(opCtx);
+}
+
CommandInvocation::~CommandInvocation() = default;
void CommandInvocation::checkAuthorization(OperationContext* opCtx,
@@ -737,13 +754,13 @@ public:
BasicCommandWithReplyBuilderInterface* command)
: CommandInvocation(command),
_command(command),
- _request(&request),
- _dbName(_request->getDatabase().toString()) {}
+ _request(request),
+ _dbName(_request.getDatabase().toString()) {}
private:
void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) override {
- opCtx->lockState()->setDebugInfo(redact(_request->body).toString());
- bool ok = _command->runWithReplyBuilder(opCtx, _dbName, _request->body, result);
+ opCtx->lockState()->setDebugInfo(redact(_request.body).toString());
+ bool ok = _command->runWithReplyBuilder(opCtx, _dbName, _request.body, result);
if (!ok) {
BSONObjBuilder bob = result->getBodyBuilder();
CommandHelpers::appendSimpleCommandStatus(bob, ok);
@@ -753,7 +770,7 @@ private:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
- uassertStatusOK(_command->explain(opCtx, *_request, verbosity, result));
+ uassertStatusOK(_command->explain(opCtx, _request, verbosity, result));
}
NamespaceString ns() const override {
@@ -768,6 +785,15 @@ private:
return _command->supportsReadConcern(cmdObj(), level);
}
+ bool supportsReadMirroring() const override {
+ return _command->supportsReadMirroring(cmdObj());
+ }
+
+ void appendMirrorableRequest(BSONObjBuilder* bob) const override {
+ invariant(cmdObj().isOwned());
+ _command->appendMirrorableRequest(bob, cmdObj());
+ }
+
bool allowsAfterClusterTime() const override {
return _command->allowsAfterClusterTime(cmdObj());
}
@@ -778,15 +804,15 @@ private:
void doCheckAuthorization(OperationContext* opCtx) const override {
uassertStatusOK(_command->checkAuthForOperation(
- opCtx, _request->getDatabase().toString(), _request->body));
+ opCtx, _request.getDatabase().toString(), _request.body));
}
const BSONObj& cmdObj() const {
- return _request->body;
+ return _request.body;
}
BasicCommandWithReplyBuilderInterface* const _command;
- const OpMsgRequest* const _request;
+ const OpMsgRequest _request;
const std::string _dbName;
};
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h
index 3eff8eb062d..a2ea871d67b 100644
--- a/src/mongo/db/commands.h
+++ b/src/mongo/db/commands.h
@@ -514,6 +514,9 @@ public:
virtual ~CommandInvocation();
+ static void set(OperationContext* opCtx, std::shared_ptr<CommandInvocation> invocation);
+ static std::shared_ptr<CommandInvocation> get(OperationContext* opCtx);
+
/**
* Runs the command, filling in result. Any exception thrown from here will cause result
* to be reset and filled in with the error. Non-const to permit modifying the request
@@ -555,13 +558,20 @@ public:
}
/**
- * Returns this invocation's support for readMirroring.
+ * Return if this invocation can be mirrored to secondaries
*/
virtual bool supportsReadMirroring() const {
return false;
}
/**
+ * Return a BSONObj that can be safely mirrored to secondaries for cache warming
+ */
+ virtual void appendMirrorableRequest(BSONObjBuilder*) const {
+ MONGO_UNREACHABLE;
+ }
+
+ /**
* Returns true if command allows afterClusterTime in its readConcern. The command may not allow
* it if it is specifically intended not to take any LockManager locks. Waiting for
* afterClusterTime takes the MODE_IS lock.
@@ -713,6 +723,20 @@ public:
{kDefaultReadConcernNotPermitted}};
}
+ /**
+ * Return if the cmdObj can be mirrored to secondaries in some form
+ */
+ virtual bool supportsReadMirroring(const BSONObj& cmdObj) const {
+ return false;
+ }
+
+ /**
+ * Return a modified form of cmdObj that can be safely mirrored to secondaries for cache warming
+ */
+ virtual void appendMirrorableRequest(BSONObjBuilder*, const BSONObj&) const {
+ MONGO_UNREACHABLE;
+ }
+
virtual bool allowsAfterClusterTime(const BSONObj& cmdObj) const {
return true;
}
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index 14cdb0c0f98..2f2a7c4b09d 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -94,6 +94,10 @@ public:
return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
+ bool supportsReadMirroring(const BSONObj&) const override {
+ return true;
+ }
+
ReadWriteType getReadWriteType() const override {
return ReadWriteType::kRead;
}
@@ -261,6 +265,24 @@ public:
return true;
}
+ void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override {
+ static const auto kMirrorableKeys = [] {
+ BSONObjBuilder keyBob;
+
+ keyBob.append("count", 1);
+ keyBob.append("query", 1);
+ keyBob.append("skip", 1);
+ keyBob.append("limit", 1);
+ keyBob.append("hint", 1);
+ keyBob.append("collation", 1);
+
+ return keyBob.obj();
+ }();
+
+ // Filter the keys that can be mirrored
+ cmdObj.filterFieldsUndotted(bob, kMirrorableKeys, true);
+ }
+
} cmdCount;
} // namespace
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index f580846dc0b..cbbaa62d243 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -93,6 +93,10 @@ public:
return ReadConcernSupportResult::allSupportedAndDefaultPermitted();
}
+ bool supportsReadMirroring(const BSONObj&) const override {
+ return true;
+ }
+
ReadWriteType getReadWriteType() const override {
return ReadWriteType::kRead;
}
@@ -310,6 +314,21 @@ public:
return true;
}
+ void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override {
+ static const auto kMirrorableKeys = [] {
+ BSONObjBuilder keyBob;
+ keyBob.append("distinct", 1);
+ keyBob.append("key", 1);
+ keyBob.append("query", 1);
+ keyBob.append("collation", 1);
+ return keyBob.obj();
+ }();
+
+ // Filter the keys that can be mirrored
+ cmdObj.filterFieldsUndotted(bob, kMirrorableKeys, true);
+ }
+
+
} distinctCmd;
} // namespace
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 02f1c8152ed..5d995f1c70e 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -229,6 +229,10 @@ public:
return true;
}
+ bool supportsReadMirroring(const BSONObj&) const override {
+ return true;
+ }
+
void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) const override {
@@ -527,6 +531,25 @@ public:
});
}
+ void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override {
+ // Filter the keys that can be mirrored
+ static const auto kMirrorableKeys = [] {
+ BSONObjBuilder keyBob;
+ keyBob.append("sort", 1);
+ keyBob.append("collation", 1);
+ return keyBob.obj();
+ }();
+
+ bob->append("find", cmdObj.firstElement().String());
+ bob->append("filter", cmdObj["query"].Obj());
+
+ cmdObj.filterFieldsUndotted(bob, kMirrorableKeys, true);
+
+ // Prevent the find from returning multiple documents since we can
+ bob->append("batchSize", 1);
+ bob->append("singleBatch", true);
+ }
+
} cmdFindAndModify;
} // namespace
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 0bb4abdccaa..5d5fcca7e8f 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -182,7 +182,9 @@ public:
class Invocation final : public CommandInvocation {
public:
Invocation(const FindCmd* definition, const OpMsgRequest& request, StringData dbName)
- : CommandInvocation(definition), _request(request), _dbName(dbName) {}
+ : CommandInvocation(definition), _request(request), _dbName(dbName) {
+ invariant(_request.body.isOwned());
+ }
private:
bool supportsWriteConcern() const override {
@@ -592,8 +594,31 @@ public:
firstBatch.done(cursorId, nss.ns());
}
+ void appendMirrorableRequest(BSONObjBuilder* bob) const override {
+ // Filter the keys that can be mirrored
+ static const auto kMirrorableKeys = [] {
+ BSONObjBuilder keyBob;
+ keyBob.append("find", 1);
+ keyBob.append("filter", 1);
+ keyBob.append("skip", 1);
+ keyBob.append("limit", 1);
+ keyBob.append("sort", 1);
+ keyBob.append("hint", 1);
+ keyBob.append("collation", 1);
+ keyBob.append("min", 1);
+ keyBob.append("max", 1);
+ return keyBob.obj();
+ }();
+
+ _request.body.filterFieldsUndotted(bob, kMirrorableKeys, true);
+
+ // Tell the find to only return a single batch
+ bob->append("batchSize", 1);
+ bob->append("singleBatch", true);
+ }
+
private:
- const OpMsgRequest& _request;
+ const OpMsgRequest _request;
const StringData _dbName;
};
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 886592fe567..e79178ed28c 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -277,13 +277,10 @@ void initWireSpec() {
void initializeCommandHooks(ServiceContext* serviceContext) {
class MongodCommandInvocationHooks final : public CommandInvocationHooks {
- void onBeforeRun(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation) {}
- void onAfterRun(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation) {
- MirrorMaestro::tryMirror(opCtx, request, invocation);
+ void onBeforeRun(OperationContext*, const OpMsgRequest&, CommandInvocation*) {}
+
+ void onAfterRun(OperationContext* opCtx, const OpMsgRequest&, CommandInvocation*) {
+ MirrorMaestro::tryMirrorRequest(opCtx);
}
};
diff --git a/src/mongo/db/mirror_maestro.cpp b/src/mongo/db/mirror_maestro.cpp
index a31bd43dd30..ce05f13fac3 100644
--- a/src/mongo/db/mirror_maestro.cpp
+++ b/src/mongo/db/mirror_maestro.cpp
@@ -40,10 +40,15 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/json.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/server_status.h"
#include "mongo/db/mirror_maestro_gen.h"
+#include "mongo/db/mirroring_sampler.h"
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/topology_version_observer.h"
+#include "mongo/executor/connection_pool.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -53,6 +58,21 @@
namespace mongo {
+namespace {
+constexpr auto kMirrorMaestroName = "MirrorMaestro"_sd;
+constexpr auto kMirrorMaestroThreadPoolMaxThreads = 2ull; // Just enough to allow concurrency
+constexpr auto kMirrorMaestroConnPoolMinSize = 1ull; // Always be able to mirror eventually
+constexpr auto kMirrorMaestroConnPoolMaxSize = 4ull; // Never use more than a handful
+
+constexpr auto kMirroredReadsParamName = "mirrorReads"_sd;
+
+constexpr auto kMirroredReadsName = "mirroredReads"_sd;
+constexpr auto kMirroredReadsSeenKey = "seen"_sd;
+constexpr auto kMirroredReadsSentKey = "sent"_sd;
+constexpr auto kMirroredReadsResolvedKey = "resolved"_sd;
+
+MONGO_FAIL_POINT_DEFINE(mirrorMaestroExpectsResponse);
+
class MirrorMaestroImpl {
public:
/**
@@ -68,12 +88,17 @@ public:
/**
* Mirror only if this maestro has been initialized
*/
- void tryMirror(OperationContext* opCtx,
- const OpMsgRequest& request,
- const CommandInvocation* invocation) noexcept;
+ void tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept;
private:
/**
+ * Attempt to mirror invocation to a subset of hosts based on params
+ *
+ * This command is expected to only run on the _executor
+ */
+ void _mirror(std::shared_ptr<CommandInvocation> invocation, MirroredReadsParameters params);
+
+ /**
* An enum detailing the liveness of the Maestro
*
* The state transition map for liveness looks like so:
@@ -99,15 +124,47 @@ private:
// inately thread safe. If _isInitialized is false, there may not even be correct pointers to
// call member functions upon.
AtomicWord<bool> _isInitialized;
+ MirroredReadsServerParameter* _params = nullptr;
std::shared_ptr<executor::TaskExecutor> _executor;
+ repl::TopologyVersionObserver _topologyVersionObserver;
};
+const auto getMirrorMaestroImpl = ServiceContext::declareDecoration<MirrorMaestroImpl>();
-namespace {
-constexpr auto kMirrorMaestroName = "MirrorMaestro"_sd;
-constexpr auto kMirrorMaestroThreadPoolMaxThreads = 2ull;
+// Define a new serverStatus section "mirroredReads"
+class MirroredReadsSection final : public ServerStatusSection {
+public:
+ using CounterT = long long;
+
+ MirroredReadsSection() : ServerStatusSection(kMirroredReadsName.toString()) {}
+
+ bool includeByDefault() const override {
+ return false;
+ }
+
+ BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override {
+ BSONObjBuilder section;
+ section.append(kMirroredReadsSeenKey, seen.loadRelaxed());
+ section.append(kMirroredReadsSentKey, sent.loadRelaxed());
+
+ if (MONGO_unlikely(mirrorMaestroExpectsResponse.shouldFail())) {
+ // We only can see if the command resolved if we got a response
+ section.append(kMirroredReadsResolvedKey, resolved.loadRelaxed());
+ }
+
+ return section.obj();
+ };
+
+ AtomicWord<CounterT> seen;
+ AtomicWord<CounterT> sent;
+ AtomicWord<CounterT> resolved;
+} gMirroredReadsSection;
+
+auto parseMirroredReadsParameters(const BSONObj& obj) {
+ IDLParserErrorContext ctx("mirrorReads");
+ return MirroredReadsParameters::parse(ctx, obj);
+}
-const auto getMirrorMaestroImpl = ServiceContext::declareDecoration<MirrorMaestroImpl>();
} // namespace
void MirroredReadsServerParameter::append(OperationContext*,
@@ -120,18 +177,21 @@ void MirroredReadsServerParameter::append(OperationContext*,
Status MirroredReadsServerParameter::set(const BSONElement& value) try {
auto obj = value.Obj();
- IDLParserErrorContext ctx(name());
- _data = MirroredReadsParameters::parse(ctx, obj);
+ _data = parseMirroredReadsParameters(obj);
return Status::OK();
} catch (const AssertionException& e) {
return e.toStatus();
}
-Status MirroredReadsServerParameter::setFromString(const std::string&) {
- using namespace fmt::literals;
- auto msg = "{:s} cannot be set from a string."_format(name());
- return {ErrorCodes::BadValue, msg};
+Status MirroredReadsServerParameter::setFromString(const std::string& str) try {
+ auto obj = fromjson(str);
+
+ _data = parseMirroredReadsParameters(obj);
+
+ return Status::OK();
+} catch (const AssertionException& e) {
+ return e.toStatus();
}
void MirrorMaestro::init(ServiceContext* serviceContext) noexcept {
@@ -151,35 +211,110 @@ void MirrorMaestro::shutdown(ServiceContext* serviceContext) noexcept {
impl.shutdown();
}
-void MirrorMaestro::tryMirror(OperationContext* opCtx,
- const OpMsgRequest& request,
- const CommandInvocation* invocation) noexcept {
+void MirrorMaestro::tryMirrorRequest(OperationContext* opCtx) noexcept {
auto& impl = getMirrorMaestroImpl(opCtx->getServiceContext());
- impl.tryMirror(opCtx, request, invocation);
+
+ auto invocation = CommandInvocation::get(opCtx);
+
+ impl.tryMirror(std::move(invocation));
}
-void MirrorMaestroImpl::tryMirror(OperationContext* opCtx,
- const OpMsgRequest& request,
- const CommandInvocation* invocation) noexcept {
+void MirrorMaestroImpl::tryMirror(std::shared_ptr<CommandInvocation> invocation) noexcept {
if (!_isInitialized.load()) {
// If we're not even available, nothing to do
return;
}
+ invariant(invocation);
if (!invocation->supportsReadMirroring()) {
// That's all, folks
return;
}
- // TODO SERVER-45816 will add the sampling function and attach the command
- repl::IsMasterResponse* imr = nullptr;
+ gMirroredReadsSection.seen.fetchAndAdd(1);
+
+ auto params = _params->_data.get();
+ if (params.getSamplingRate() <= 0.0) {
+ // We'll never need to sample
+ return;
+ }
+
+ // There is the potential to actually mirror requests, so schedule the _mirror() invocation
+ // out-of-line. This means the command itself can return quickly and we do the arduous work of
+ // building new bsons and evaluating randomness in a less important context.
+ ExecutorFuture(_executor) //
+ .getAsync([this, invocation = std::move(invocation), params = std::move(params)](
+ const auto& status) mutable {
+ invariant(status.isOK());
+
+ _mirror(std::move(invocation), std::move(params));
+ });
+}
+
+void MirrorMaestroImpl::_mirror(std::shared_ptr<CommandInvocation> invocation,
+ MirroredReadsParameters params) {
+ auto imr = _topologyVersionObserver.getCached();
if (!imr) {
// If we don't have an IsMasterResponse, we can't know where to send our mirrored
// request
return;
}
- MONGO_UNREACHABLE;
+ auto hosts = MirroringSampler::getMirroringTargets(imr, params.getSamplingRate());
+ if (hosts.empty()) {
+ return;
+ }
+
+ auto payload = [&] {
+ BSONObjBuilder bob;
+
+ invocation->appendMirrorableRequest(&bob);
+
+ // Limit the maxTimeMS
+ bob.append("maxTimeMS", params.getMaxTimeMS());
+
+ {
+ // Set secondary read preference
+ BSONObjBuilder rpBob = bob.subobjStart("$readPreference");
+ rpBob.append("mode", "secondary");
+ }
+ return bob.obj();
+ }();
+
+ for (auto& host : hosts) {
+ static const auto mirrorResponseCallback = [host](auto& args) {
+ if (MONGO_likely(!mirrorMaestroExpectsResponse.shouldFail())) {
+ // If we don't expect responses, then there is nothing to do here
+ return;
+ }
+
+ if (args.response.isOK()) {
+ gMirroredReadsSection.resolved.fetchAndAdd(1);
+ LOGV2_DEBUG(31457,
+ 4,
+ "Received response from {host}, response: {response}",
+ "host"_attr = host,
+ "response"_attr = args.response);
+ }
+ };
+
+ auto newRequest = executor::RemoteCommandRequest(
+ host, invocation->ns().db().toString(), payload, nullptr);
+ LOGV2_DEBUG(31455,
+ 4,
+ "Mirroring to {host}, request: {request}",
+ "host"_attr = host,
+ "request"_attr = newRequest);
+ auto status =
+ _executor->scheduleRemoteCommand(newRequest, mirrorResponseCallback).getStatus();
+ if (!status.isOK()) {
+ LOGV2_DEBUG(
+ 31456, 2, "Failed to mirror read command due to {error}", "error"_attr = status);
+ continue;
+ }
+
+ gMirroredReadsSection.sent.fetchAndAdd(1);
+ }
}
void MirrorMaestroImpl::init(ServiceContext* serviceContext) noexcept {
@@ -203,7 +338,13 @@ void MirrorMaestroImpl::init(ServiceContext* serviceContext) noexcept {
} break;
};
- auto makeNet = [&] { return executor::makeNetworkInterface(kMirrorMaestroName.toString()); };
+ auto makeNet = [&] {
+ executor::ConnectionPool::Options options;
+ options.minConnections = kMirrorMaestroConnPoolMinSize;
+ options.maxConnections = kMirrorMaestroConnPoolMaxSize;
+ return executor::makeNetworkInterface(
+ kMirrorMaestroName.toString(), {}, {}, std::move(options));
+ };
auto makePool = [&] {
ThreadPool::Options options;
@@ -214,12 +355,17 @@ void MirrorMaestroImpl::init(ServiceContext* serviceContext) noexcept {
_executor = std::make_shared<executor::ThreadPoolTaskExecutor>(makePool(), makeNet());
_executor->startup();
+ _topologyVersionObserver.init(serviceContext);
+
+ _params =
+ ServerParameterSet::getGlobal()->get<MirroredReadsServerParameter>(kMirroredReadsParamName);
+ invariant(_params);
// Set _initGuard.liveness to kRunning
_initGuard.liveness = Liveness::kRunning;
- // Mark the maestro as initialized. It is now safe to call tryMirror(), use the _executor, or
- // otherwise rely on members to be alive and well.
+ // Mark the maestro as initialized. It is now safe to call tryMirrorRequest(), use the
+ // _executor, or otherwise rely on members to be alive and well.
_isInitialized.store(true);
}
@@ -240,6 +386,8 @@ void MirrorMaestroImpl::shutdown() noexcept {
} break;
};
+ _topologyVersionObserver.shutdown();
+
if (_executor) {
_executor->shutdown();
}
diff --git a/src/mongo/db/mirror_maestro.h b/src/mongo/db/mirror_maestro.h
index da99be92787..f595fd0bcc9 100644
--- a/src/mongo/db/mirror_maestro.h
+++ b/src/mongo/db/mirror_maestro.h
@@ -65,14 +65,12 @@ public:
static void shutdown(ServiceContext* serviceContext) noexcept;
/**
- * Check if a given invocation+request should be mirrored to secondaries, and schedule that work
- * if so.
+ * Check if the request associated with opCtx should be mirrored to secondaries, and schedule
+ * that work if so.
*
* This function will noop if the MirrorMaestro is currently being initialized or shutdown.
*/
- static void tryMirror(OperationContext* opCtx,
- const OpMsgRequest& request,
- const CommandInvocation* invocation) noexcept;
+ static void tryMirrorRequest(OperationContext* opCtx) noexcept;
};
} // namespace mongo
diff --git a/src/mongo/db/mirror_maestro.idl b/src/mongo/db/mirror_maestro.idl
index aa058107391..d486f5aba97 100644
--- a/src/mongo/db/mirror_maestro.idl
+++ b/src/mongo/db/mirror_maestro.idl
@@ -46,6 +46,13 @@ structs:
validator:
gte: 0.0
lte: 1.0
+ maxTimeMS:
+ description: >-
+ The maxTimeMS to apply to mirrored reads
+ type: int
+ default: 1000
+ validator:
+ gt: 0
server_parameters:
mirrorReads:
diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp
index 48d8f49b519..df713b26676 100644
--- a/src/mongo/db/query/query_planner.cpp
+++ b/src/mongo/db/query/query_planner.cpp
@@ -142,7 +142,7 @@ string optionString(size_t options) {
}
static BSONObj getKeyFromQuery(const BSONObj& keyPattern, const BSONObj& query) {
- return query.extractFieldsUnDotted(keyPattern);
+ return query.extractFieldsUndotted(keyPattern);
}
static bool indexCompatibleMaxMin(const BSONObj& obj,
diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp
index 9d7cef07fd2..635801a0fb7 100644
--- a/src/mongo/db/repl/topology_version_observer.cpp
+++ b/src/mongo/db/repl/topology_version_observer.cpp
@@ -40,62 +40,83 @@
namespace mongo {
namespace repl {
-void TopologyVersionObserver::init(ReplicationCoordinator* replCoordinator) noexcept {
- invariant(replCoordinator);
- stdx::lock_guard<Mutex> lk(_mutex);
- invariant(_state.load() == State::kUninitialized);
-
+void TopologyVersionObserver::init(ServiceContext* serviceContext,
+ ReplicationCoordinator* replCoordinator) noexcept {
LOGV2_INFO(40440,
"Starting {topologyVersionObserverName}",
"topologyVersionObserverName"_attr = toString());
- _replCoordinator = replCoordinator;
+
+ stdx::unique_lock lk(_mutex);
+
+ _serviceContext = serviceContext;
+ invariant(_serviceContext);
+
+ _replCoordinator =
+ replCoordinator ? replCoordinator : ReplicationCoordinator::get(_serviceContext);
+ invariant(_replCoordinator);
invariant(!_thread);
+ invariant(_state.load() == State::kUninitialized);
_thread = stdx::thread([&]() { this->_workerThreadBody(); });
- // Wait for the observer thread to update the status.
- while (_state.load() != State::kRunning) {
- }
+ _cv.wait(lk, [&] { return _state.load() != State::kUninitialized; });
}
void TopologyVersionObserver::shutdown() noexcept {
- stdx::unique_lock<Mutex> lk(_mutex);
- if (_state.load() == State::kUninitialized) {
- return;
- }
+ auto shouldWaitForShutdown = _shouldShutdown.swap(true);
+ if (shouldWaitForShutdown) {
+ // If we aren't the first ones to call shutdown, wait for the thread to stop
+ stdx::unique_lock lk(_mutex);
- // Check if another `shutdown()` has already completed.
- if (!_thread) {
+ _cv.wait(lk, [&] { return _state.load() == State::kShutdown; });
+ invariant(_state.load() == State::kShutdown);
return;
}
LOGV2_INFO(40441,
"Stopping {topologyVersionObserverName}",
"topologyVersionObserverName"_attr = toString());
- auto state = _state.load();
- invariant(state == State::kRunning || state == State::kShutdown);
-
- // Wait for the observer client to exit from its main loop.
- // Observer thread must update the state before attempting to acquire the mutex.
- while (_state.load() == State::kRunning) {
- invariant(_observerClient);
- _observerClient->lock();
- auto opCtx = _observerClient->getOperationContext();
- if (opCtx) {
- opCtx->markKilled(ErrorCodes::ShutdownInProgress);
- }
- _observerClient->unlock();
- }
- invariant(_state.load() == State::kShutdown);
- auto thread = std::exchange(_thread, boost::none);
- lk.unlock();
+ // Wait for the thread to stop and steal it to the local stack
+ auto thread = [&] {
+ stdx::unique_lock lk(_mutex);
+
+ _cv.wait(lk, [&] {
+ if (_state.load() != State::kRunning) {
+ // If we are no longer running, then we can safely join
+ return true;
+ }
+
+ // If we are still running, attempt to kill any opCtx
+ invariant(_observerClient);
+ stdx::lock_guard clientLk(*_observerClient);
+ auto opCtx = _observerClient->getOperationContext();
+ if (opCtx) {
+ _serviceContext->killOperation(clientLk, opCtx, ErrorCodes::ShutdownInProgress);
+ }
+
+ return false;
+ });
+ invariant(_state.load() == State::kShutdown);
+
+ return std::exchange(_thread, boost::none);
+ }();
+
+ if (!thread) {
+ // We never started
+ return;
+ }
- invariant(thread);
+ // Finally join
thread->join();
}
std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::getCached() noexcept {
+ if (_state.load() != State::kRunning || _shouldShutdown.load()) {
+ // Early return if we know there isn't a worker
+ return {};
+ }
+
// Acquires the lock to avoid potential races with `_workerThreadBody()`.
// Atomics cannot be used here as `shared_ptr` cannot be atomically updated.
stdx::lock_guard<Mutex> lk(_mutex);
@@ -107,7 +128,7 @@ std::string TopologyVersionObserver::toString() const {
}
std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterResponse(
- boost::optional<TopologyVersion> topologyVersion, bool* shouldShutdown) try {
+ boost::optional<TopologyVersion> topologyVersion, bool* shouldShutdown) noexcept try {
invariant(*shouldShutdown == false);
ServiceContext::UniqueOperationContext opCtx;
try {
@@ -120,6 +141,8 @@ std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterRes
}
invariant(opCtx);
+
+ invariant(_replCoordinator);
auto future = _replCoordinator->getIsMasterResponseFuture({}, topologyVersion);
auto response = future.get(opCtx.get());
if (!response->isConfigSet()) {
@@ -140,17 +163,10 @@ std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterRes
}
void TopologyVersionObserver::_workerThreadBody() noexcept {
- invariant(_state.load() == State::kUninitialized);
-
// Creates a new client and makes `_observerClient` to point to it, which allows `shutdown()`
// to access the client object.
- Client::initThread(kTopologyVersionObserverName);
- _observerClient = Client::getCurrent();
-
- // `init()` must hold the mutex until the observer updates the state.
- invariant(!_mutex.try_lock());
- // The following notifies `init()` that `_observerClient` is set and ready to use.
- _state.store(State::kRunning);
+ invariant(_serviceContext);
+ ThreadClient tc(kTopologyVersionObserverName, _serviceContext);
auto getTopologyVersion = [&]() -> boost::optional<TopologyVersion> {
// Only the observer thread updates `_cache`, thus there is no need to hold the lock before
@@ -161,37 +177,54 @@ void TopologyVersionObserver::_workerThreadBody() noexcept {
return boost::none;
};
- ON_BLOCK_EXIT([&] {
- // Once the observer detects a shutdown, it must update the state first before attempting
- // to acquire the lock. This is necessary to avoid deadlocks.
- invariant(_state.load() == State::kRunning);
- _state.store(State::kShutdown);
+ LOGV2_INFO(40445,
+ "Started {topologyVersionObserverName}",
+ "topologyVersionObserverName"_attr = toString());
- stdx::unique_lock lock(_mutex);
+ {
+ stdx::lock_guard lk(_mutex);
+ invariant(_state.load() == State::kUninitialized);
+ if (_shouldShutdown.load()) {
+ _state.store(State::kShutdown);
+ _cv.notify_all();
- // Invalidate the cache as it is no longer updated
- _cache.reset();
+ return;
+ }
+
+ // The following notifies `init()` that `_observerClient` is set and ready to use.
+ _state.store(State::kRunning);
+ _observerClient = tc.get();
+ _cv.notify_all();
+ }
+
+ ON_BLOCK_EXIT([&] {
+ {
+ stdx::lock_guard lk(_mutex);
+ invariant(_state.load() == State::kRunning);
+ _state.store(State::kShutdown);
- // Client object is local to this thread, and is no longer be available.
- _observerClient = nullptr;
+ // Invalidate the cache as it is no longer updated
+ _cache.reset();
+ _observerClient = nullptr;
+
+ _cv.notify_all();
+ }
LOGV2_INFO(40447,
"Stopped {topologyVersionObserverName}",
"topologyVersionObserverName"_attr = toString());
});
- bool shouldShutdown = false;
- LOGV2_INFO(40445,
- "Started {topologyVersionObserverName}",
- "topologyVersionObserverName"_attr = toString());
- while (!shouldShutdown) {
- auto response = _getIsMasterResponse(getTopologyVersion(), &shouldShutdown);
- // Only update if the version is more recent than the cached version, or `_cache` is null.
- if (!shouldShutdown && response != _cache) {
- stdx::lock_guard lock(_mutex);
- _cache = response;
- }
- }
+ bool receivedShutdownError;
+ do {
+ receivedShutdownError = false;
+ auto response = _getIsMasterResponse(getTopologyVersion(), &receivedShutdownError);
+
+ stdx::lock_guard lk(_mutex);
+ _cache = response;
+
+ // If either the global shutdown flag was set or we received a shutdown error, we're done
+ } while (!(receivedShutdownError || _shouldShutdown.load()));
}
} // namespace repl
diff --git a/src/mongo/db/repl/topology_version_observer.h b/src/mongo/db/repl/topology_version_observer.h
index afc55321ab2..908535194f0 100644
--- a/src/mongo/db/repl/topology_version_observer.h
+++ b/src/mongo/db/repl/topology_version_observer.h
@@ -69,22 +69,15 @@ constexpr auto kTopologyVersionObserverName = "TopologyVersionObserver";
*/
class TopologyVersionObserver final {
public:
- TopologyVersionObserver() {}
-
- TopologyVersionObserver(const TopologyVersionObserver&) = delete;
-
- TopologyVersionObserver(TopologyVersionObserver&&) noexcept = delete;
-
- TopologyVersionObserver& operator=(const TopologyVersionObserver&) = delete;
-
- TopologyVersionObserver& operator=(TopologyVersionObserver&&) = delete;
+ TopologyVersionObserver() = default;
~TopologyVersionObserver() {
auto state = _state.load();
invariant(state == State::kShutdown || state == State::kUninitialized);
}
- void init(ReplicationCoordinator* replCoordinator) noexcept;
+ void init(ServiceContext* serviceContext,
+ ReplicationCoordinator* replCoordinator = nullptr) noexcept;
void shutdown() noexcept;
@@ -106,7 +99,7 @@ private:
};
std::shared_ptr<const IsMasterResponse> _getIsMasterResponse(boost::optional<TopologyVersion>,
- bool*);
+ bool*) noexcept;
void _workerThreadBody() noexcept;
@@ -119,22 +112,33 @@ private:
* is that the contention on this lock is insignificant.
*/
mutable Mutex _mutex = MONGO_MAKE_LATCH(kTopologyVersionObserverName);
+ stdx::condition_variable _cv;
+
+ /**
+ * Tells the worker thread if it should continue to run
+ *
+ * This variable is set to true from false outside the worker thread
+ */
+ AtomicWord<bool> _shouldShutdown;
// The reference to the latest cached version of `IsMasterResponse`
std::shared_ptr<const IsMasterResponse> _cache;
- // Holds a reference to the observer client to allow `shutdown()` to stop the observer thread.
- Client* _observerClient;
-
/**
* Represents the current state of the observer.
- * Note that the observer thread never updates the state.
+ *
+ * This variable is only changed from the worker thread
*/
AtomicWord<State> _state;
+ // Holds a reference to the observer client to allow `shutdown()` to stop the observer thread.
+ // This variable is only consistent when _state == State::kRunning and _mutex is acquired.
+ Client* _observerClient;
+
boost::optional<stdx::thread> _thread;
- ReplicationCoordinator* _replCoordinator;
+ ServiceContext* _serviceContext = nullptr;
+ ReplicationCoordinator* _replCoordinator = nullptr;
};
} // namespace repl
diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp
index 81532419f16..ab60f0d03e7 100644
--- a/src/mongo/db/repl/topology_version_observer_test.cpp
+++ b/src/mongo/db/repl/topology_version_observer_test.cpp
@@ -87,8 +87,9 @@ public:
getNet()->advanceTime(Date_t::now() + sleepTime);
getNet()->exitNetwork();
+ auto serviceContext = getServiceContext();
observer = std::make_unique<TopologyVersionObserver>();
- observer->init(replCoord);
+ observer->init(serviceContext, replCoord);
}
~TopologyVersionObserverTest() {
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index db2956f0dbe..8878ff28f60 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -866,7 +866,8 @@ void execCommandDatabase(OperationContext* opCtx,
BSONObjBuilder extraFieldsBuilder;
auto startOperationTime = getClientOperationTime(opCtx);
- auto invocation = command->parse(opCtx, request);
+ std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request);
+ CommandInvocation::set(opCtx, invocation);
OperationSessionInfoFromClient sessionOptions;
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 3cb5015ff63..045cba37e40 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -364,7 +364,8 @@ void runCommand(OperationContext* opCtx,
opCtx->setComment(commentField.wrap());
}
- auto invocation = command->parse(opCtx, request);
+ std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request);
+ CommandInvocation::set(opCtx, invocation);
// Set the logical optype, command object and namespace as soon as we identify the command. If
// the command does not define a fully-qualified namespace, set CurOp to the generic command