summaryrefslogtreecommitdiff
path: root/src/mongo/s/mongos_main.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/mongos_main.cpp')
-rw-r--r--src/mongo/s/mongos_main.cpp989
1 files changed, 989 insertions, 0 deletions
diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp
new file mode 100644
index 00000000000..6894950f899
--- /dev/null
+++ b/src/mongo/s/mongos_main.cpp
@@ -0,0 +1,989 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/mongos_main.h"
+
+#include <boost/optional.hpp>
+#include <memory>
+
+#include "mongo/base/init.h"
+#include "mongo/base/initializer.h"
+#include "mongo/base/status.h"
+#include "mongo/client/connpool.h"
+#include "mongo/client/dbclient_rs.h"
+#include "mongo/client/global_conn_pool.h"
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/client/remote_command_targeter_factory_impl.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/config.h"
+#include "mongo/db/audit.h"
+#include "mongo/db/auth/authorization_manager.h"
+#include "mongo/db/auth/authz_manager_external_state_s.h"
+#include "mongo/db/auth/user_cache_invalidator_job.h"
+#include "mongo/db/client.h"
+#include "mongo/db/client_metadata_propagation_egress_hook.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/ftdc/ftdc_mongos.h"
+#include "mongo/db/initialize_server_global_state.h"
+#include "mongo/db/initialize_server_security_state.h"
+#include "mongo/db/kill_sessions.h"
+#include "mongo/db/lasterror.h"
+#include "mongo/db/log_process_details.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/db/logical_session_cache_impl.h"
+#include "mongo/db/logical_time_metadata_hook.h"
+#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/read_write_concern_defaults_cache_lookup_mongos.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/service_liaison_mongos.h"
+#include "mongo/db/session_killer.h"
+#include "mongo/db/startup_warnings_common.h"
+#include "mongo/db/wire_version.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/logv2/log.h"
+#include "mongo/platform/process_id.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard_factory.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/client/shard_remote.h"
+#include "mongo/s/client/sharding_connection_hook.h"
+#include "mongo/s/commands/kill_sessions_remote.h"
+#include "mongo/s/committed_optime_metadata_hook.h"
+#include "mongo/s/config_server_catalog_cache_loader.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/is_mongos.h"
+#include "mongo/s/mongos_options.h"
+#include "mongo/s/mongos_server_parameters_gen.h"
+#include "mongo/s/mongos_topology_coordinator.h"
+#include "mongo/s/query/cluster_cursor_cleanup_job.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/service_entry_point_mongos.h"
+#include "mongo/s/session_catalog_router.h"
+#include "mongo/s/sessions_collection_sharded.h"
+#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
+#include "mongo/s/sharding_initialization.h"
+#include "mongo/s/sharding_uptime_reporter.h"
+#include "mongo/s/transaction_router.h"
+#include "mongo/s/version_mongos.h"
+#include "mongo/scripting/dbdirectclient_factory.h"
+#include "mongo/scripting/engine.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/transport_layer_manager.h"
+#include "mongo/util/admin_access.h"
+#include "mongo/util/cmdline_utils/censor_cmdline.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/concurrency/thread_name.h"
+#include "mongo/util/exception_filter_win32.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/fast_clock_source_factory.h"
+#include "mongo/util/latch_analyzer.h"
+#include "mongo/util/net/ocsp/ocsp_manager.h"
+#include "mongo/util/net/socket_exception.h"
+#include "mongo/util/net/socket_utils.h"
+#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/ntservice.h"
+#include "mongo/util/options_parser/startup_options.h"
+#include "mongo/util/periodic_runner.h"
+#include "mongo/util/periodic_runner_factory.h"
+#include "mongo/util/processinfo.h"
+#include "mongo/util/quick_exit.h"
+#include "mongo/util/signal_handlers.h"
+#include "mongo/util/stacktrace.h"
+#include "mongo/util/str.h"
+#include "mongo/util/text.h"
+#include "mongo/util/version.h"
+
+namespace mongo {
+
+using logv2::LogComponent;
+
+#if !defined(__has_feature)
+#define __has_feature(x) 0
+#endif
+
+// Failpoint for disabling replicaSetChangeConfigServerUpdateHook calls on signaled mongos.
+MONGO_FAIL_POINT_DEFINE(failReplicaSetChangeConfigServerUpdateHook);
+
+namespace {
+
+MONGO_FAIL_POINT_DEFINE(pauseWhileKillingOperationsAtShutdown);
+
+#if defined(_WIN32)
+const ntservice::NtServiceDefaultStrings defaultServiceStrings = {
+ L"MongoS", L"MongoDB Router", L"MongoDB Sharding Router"};
+#endif
+
+constexpr auto kSignKeysRetryInterval = Seconds{1};
+
+boost::optional<ShardingUptimeReporter> shardingUptimeReporter;
+
+Status waitForSigningKeys(OperationContext* opCtx) {
+ auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
+
+ while (true) {
+ // This should be true when shard registry is up
+ invariant(shardRegistry->isUp());
+
+ auto configCS = shardRegistry->getConfigServerConnectionString();
+ auto rsm = ReplicaSetMonitor::get(configCS.getSetName());
+ // mongod will set minWireVersion == maxWireVersion for isMaster requests from
+ // internalClient.
+ if (rsm && (rsm->getMaxWireVersion() < WireVersion::SUPPORTS_OP_MSG)) {
+ LOGV2(22841, "Waiting for signing keys not supported by config shard");
+ return Status::OK();
+ }
+ auto stopStatus = opCtx->checkForInterruptNoAssert();
+ if (!stopStatus.isOK()) {
+ return stopStatus;
+ }
+
+ try {
+ if (LogicalTimeValidator::get(opCtx)->shouldGossipLogicalTime()) {
+ return Status::OK();
+ }
+ LOGV2(22842,
+ "Waiting for signing keys, sleeping for {signingKeysCheckInterval} and then"
+ " checking again",
+ "Waiting for signing keys, sleeping before checking again",
+ "signingKeysCheckInterval"_attr = Seconds(kSignKeysRetryInterval));
+ sleepFor(kSignKeysRetryInterval);
+ continue;
+ } catch (const DBException& ex) {
+ LOGV2_WARNING(22853,
+ "Error while waiting for signing keys, sleeping for"
+ " {signingKeysCheckInterval} and then checking again {error}",
+ "Error while waiting for signing keys, sleeping before checking again",
+ "signingKeysCheckInterval"_attr = Seconds(kSignKeysRetryInterval),
+ "error"_attr = ex);
+ sleepFor(kSignKeysRetryInterval);
+ continue;
+ }
+ }
+}
+
+
+/**
+ * Abort all active transactions in the catalog that has not yet been committed.
+ *
+ * Outline:
+ * 1. Mark all sessions as killed and collect killTokens from each session.
+ * 2. Create a new Client in order not to pollute the current OperationContext.
+ * 3. Create new OperationContexts for each session to be killed and perform the necessary setup
+ * to be able to abort transactions properly: like setting TxnNumber and attaching the session
+ * to the OperationContext.
+ * 4. Send abortTransaction.
+ */
+void implicitlyAbortAllTransactions(OperationContext* opCtx) {
+ struct AbortTransactionDetails {
+ public:
+ AbortTransactionDetails(LogicalSessionId _lsid, SessionCatalog::KillToken _killToken)
+ : lsid(std::move(_lsid)), killToken(std::move(_killToken)) {}
+
+ LogicalSessionId lsid;
+ SessionCatalog::KillToken killToken;
+ };
+
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ SessionKiller::Matcher matcherAllSessions(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
+
+ const auto abortDeadline =
+ opCtx->getServiceContext()->getFastClockSource()->now() + Seconds(15);
+
+ std::vector<AbortTransactionDetails> toKill;
+ catalog->scanSessions(matcherAllSessions, [&](const ObservableSession& session) {
+ toKill.emplace_back(session.getSessionId(),
+ session.kill(ErrorCodes::InterruptedAtShutdown));
+ });
+
+ auto newClient = opCtx->getServiceContext()->makeClient("ImplicitlyAbortTxnAtShutdown");
+ AlternativeClientRegion acr(newClient);
+
+ Status shutDownStatus(ErrorCodes::InterruptedAtShutdown,
+ "aborting transactions due to shutdown");
+
+ for (auto& killDetails : toKill) {
+ auto uniqueNewOpCtx = cc().makeOperationContext();
+ auto newOpCtx = uniqueNewOpCtx.get();
+
+ newOpCtx->setDeadlineByDate(abortDeadline, ErrorCodes::ExceededTimeLimit);
+
+ OperationContextSession sessionCtx(newOpCtx, std::move(killDetails.killToken));
+
+ auto session = OperationContextSession::get(newOpCtx);
+ newOpCtx->setLogicalSessionId(session->getSessionId());
+
+ auto txnRouter = TransactionRouter::get(newOpCtx);
+ if (txnRouter.isInitialized()) {
+ txnRouter.implicitlyAbortTransaction(newOpCtx, shutDownStatus);
+ }
+ }
+}
+
+/**
+ * NOTE: This function may be called at any time after registerShutdownTask is called below. It must
+ * not depend on the prior execution of mongo initializers or the existence of threads.
+ */
+void cleanupTask(const ShutdownTaskArgs& shutdownArgs) {
+ const auto serviceContext = getGlobalServiceContext();
+ {
+ // This client initiation pattern is only to be used here, with plans to eliminate this
+ // pattern down the line.
+ if (!haveClient())
+ Client::initThread(getThreadName());
+ Client& client = cc();
+
+ ServiceContext::UniqueOperationContext uniqueTxn;
+ OperationContext* opCtx = client.getOperationContext();
+ if (!opCtx) {
+ uniqueTxn = client.makeOperationContext();
+ opCtx = uniqueTxn.get();
+ }
+
+ Milliseconds quiesceTime;
+ if (shutdownArgs.quiesceTime) {
+ quiesceTime = *shutdownArgs.quiesceTime;
+ } else {
+ // IDL gaurantees that quiesceTime is populated.
+ invariant(!shutdownArgs.isUserInitiated);
+ quiesceTime = Milliseconds(mongosShutdownTimeoutMillisForSignaledShutdown.load());
+ }
+
+ // Enter quiesce mode so that existing and new short operations are allowed to finish.
+ // At this point, we will start responding to any isMaster request with ShutdownInProgress
+ // so that clients can re-route their operations.
+ if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx)) {
+ mongosTopCoord->enterQuiesceModeAndWait(opCtx, quiesceTime);
+ }
+
+ // Shutdown the TransportLayer so that new connections aren't accepted
+ if (auto tl = serviceContext->getTransportLayer()) {
+ LOGV2_OPTIONS(
+ 22843, {LogComponent::kNetwork}, "shutdown: going to close all sockets...");
+
+ tl->shutdown();
+ }
+
+ try {
+ // Abort transactions while we can still send remote commands.
+ implicitlyAbortAllTransactions(opCtx);
+ } catch (const DBException& excep) {
+ LOGV2_WARNING(22854,
+ "Encountered {error} while trying to abort all active transactions",
+ "Error aborting all active transactions",
+ "error"_attr = excep);
+ }
+
+ if (auto lsc = LogicalSessionCache::get(serviceContext)) {
+ lsc->joinOnShutDown();
+ }
+
+ ReplicaSetMonitor::shutdown();
+
+ opCtx->setIsExecutingShutdown();
+
+ if (serviceContext) {
+ serviceContext->setKillAllOperations();
+
+ if (MONGO_unlikely(pauseWhileKillingOperationsAtShutdown.shouldFail())) {
+ LOGV2(4701800, "pauseWhileKillingOperationsAtShutdown failpoint enabled");
+ sleepsecs(1);
+ }
+ }
+
+ // Perform all shutdown operations after setKillAllOperations is called in order to ensure
+ // that any pending threads are about to terminate
+
+ if (auto validator = LogicalTimeValidator::get(serviceContext)) {
+ validator->shutDown();
+ }
+
+ if (auto cursorManager = Grid::get(opCtx)->getCursorManager()) {
+ cursorManager->shutdown(opCtx);
+ }
+
+ if (auto pool = Grid::get(opCtx)->getExecutorPool()) {
+ pool->shutdownAndJoin();
+ }
+
+ if (auto catalog = Grid::get(opCtx)->catalogClient()) {
+ catalog->shutDown(opCtx);
+ }
+
+ if (auto shardRegistry = Grid::get(opCtx)->shardRegistry()) {
+ shardRegistry->shutdown();
+ }
+
+ if (Grid::get(serviceContext)->isShardingInitialized()) {
+ CatalogCacheLoader::get(serviceContext).shutDown();
+ }
+
+#if __has_feature(address_sanitizer)
+ // When running under address sanitizer, we get false positive leaks due to disorder around
+ // the lifecycle of a connection and request. When we are running under ASAN, we try a lot
+ // harder to dry up the server from active connections before going on to really shut down.
+
+ // Shutdown the Service Entry Point and its sessions and give it a grace period to complete.
+ if (auto sep = serviceContext->getServiceEntryPoint()) {
+ if (!sep->shutdown(Seconds(10))) {
+ LOGV2_OPTIONS(22844,
+ {LogComponent::kNetwork},
+ "Service entry point did not shutdown within the time limit");
+ }
+ }
+
+ // Shutdown and wait for the service executor to exit
+ if (auto svcExec = serviceContext->getServiceExecutor()) {
+ Status status = svcExec->shutdown(Seconds(5));
+ if (!status.isOK()) {
+ LOGV2_OPTIONS(22845,
+ {LogComponent::kNetwork},
+ "Service executor did not shutdown within the time limit",
+ "error"_attr = status);
+ }
+ }
+#endif
+
+ // Shutdown Full-Time Data Capture
+ stopMongoSFTDC();
+ }
+
+ audit::logShutdown(Client::getCurrent());
+
+#ifndef MONGO_CONFIG_USE_RAW_LATCHES
+ LatchAnalyzer::get(serviceContext).dump();
+#endif
+}
+
+Status initializeSharding(OperationContext* opCtx) {
+ auto targeterFactory = std::make_unique<RemoteCommandTargeterFactoryImpl>();
+ auto targeterFactoryPtr = targeterFactory.get();
+
+ ShardFactory::BuilderCallable setBuilder = [targeterFactoryPtr](
+ const ShardId& shardId,
+ const ConnectionString& connStr) {
+ return std::make_unique<ShardRemote>(shardId, connStr, targeterFactoryPtr->create(connStr));
+ };
+
+ ShardFactory::BuilderCallable masterBuilder = [targeterFactoryPtr](
+ const ShardId& shardId,
+ const ConnectionString& connStr) {
+ return std::make_unique<ShardRemote>(shardId, connStr, targeterFactoryPtr->create(connStr));
+ };
+
+ ShardFactory::BuildersMap buildersMap{
+ {ConnectionString::SET, std::move(setBuilder)},
+ {ConnectionString::MASTER, std::move(masterBuilder)},
+ };
+
+ auto shardFactory =
+ std::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
+
+ CatalogCacheLoader::set(opCtx->getServiceContext(),
+ std::make_unique<ConfigServerCatalogCacheLoader>());
+
+ auto catalogCache = std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx));
+
+ // List of hooks which will be called by the ShardRegistry when it discovers a shard has been
+ // removed.
+ std::vector<ShardRegistry::ShardRemovalHook> shardRemovalHooks = {
+ // Invalidate appropriate entries in the catalog cache when a shard is removed. It's safe to
+ // capture the catalog cache pointer since the Grid (and therefore CatalogCache and
+ // ShardRegistry) are never destroyed.
+ [catCache = catalogCache.get()](const ShardId& removedShard) {
+ catCache->invalidateEntriesThatReferenceShard(removedShard);
+ }};
+
+ if (mongosGlobalParams.configdbs.type() == ConnectionString::INVALID) {
+ return {ErrorCodes::BadValue, "Unrecognized connection string."};
+ }
+
+ auto shardRegistry = std::make_unique<ShardRegistry>(
+ std::move(shardFactory), mongosGlobalParams.configdbs, std::move(shardRemovalHooks));
+
+ Status status = initializeGlobalShardingState(
+ opCtx,
+ generateDistLockProcessId(opCtx),
+ std::move(catalogCache),
+ std::move(shardRegistry),
+ [opCtx]() {
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ hookList->addHook(
+ std::make_unique<rpc::LogicalTimeMetadataHook>(opCtx->getServiceContext()));
+ hookList->addHook(
+ std::make_unique<rpc::CommittedOpTimeMetadataHook>(opCtx->getServiceContext()));
+ hookList->addHook(std::make_unique<rpc::ClientMetadataPropagationEgressHook>());
+ hookList->addHook(std::make_unique<rpc::ShardingEgressMetadataHookForMongos>(
+ opCtx->getServiceContext()));
+ return hookList;
+ },
+ boost::none);
+
+ if (!status.isOK()) {
+ return status;
+ }
+
+ status = waitForShardRegistryReload(opCtx);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ status = waitForSigningKeys(opCtx);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ status = preCacheMongosRoutingInfo(opCtx);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ status = preWarmConnectionPool(opCtx);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ Grid::get(opCtx)->setShardingInitialized();
+
+ return Status::OK();
+}
+
+void initWireSpec() {
+ WireSpec& spec = WireSpec::instance();
+
+ // Since the upgrade order calls for upgrading mongos last, it only needs to talk the latest
+ // wire version. This ensures that users will get errors if they upgrade in the wrong order.
+ spec.outgoing.minWireVersion = LATEST_WIRE_VERSION;
+ spec.outgoing.maxWireVersion = LATEST_WIRE_VERSION;
+
+ spec.isInternalClient = true;
+}
+
+class ShardingReplicaSetChangeListener final
+ : public ReplicaSetChangeNotifier::Listener,
+ public std::enable_shared_from_this<ShardingReplicaSetChangeListener> {
+public:
+ ShardingReplicaSetChangeListener(ServiceContext* serviceContext)
+ : _serviceContext(serviceContext) {}
+ ~ShardingReplicaSetChangeListener() final = default;
+
+ void onFoundSet(const Key& key) noexcept final {}
+
+ void onConfirmedSet(const State& state) noexcept final {
+ auto connStr = state.connStr;
+ try {
+ LOGV2(471693,
+ "Updating the shard registry with confirmed replica set",
+ "connectionString"_attr = connStr);
+ Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(connStr);
+ } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
+ LOGV2(471694,
+ "Unable to update the shard registry with confirmed replica set",
+ "error"_attr = e);
+ }
+
+ auto setName = connStr.getSetName();
+ bool updateInProgress = false;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (!_hasUpdateState(lock, setName)) {
+ _updateStates.emplace(setName, std::make_shared<ReplSetConfigUpdateState>());
+ }
+ auto updateState = _updateStates.at(setName);
+ updateState->nextUpdateToSend = connStr;
+ updateInProgress = updateState->updateInProgress;
+ }
+
+ if (!updateInProgress) {
+ _scheduleUpdateConfigServer(setName);
+ }
+ }
+
+ void onPossibleSet(const State& state) noexcept final {
+ try {
+ Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr);
+ } catch (const DBException& ex) {
+ LOGV2_DEBUG(22849,
+ 2,
+ "Unable to update sharding state with possible replica set due to {error}",
+ "Unable to update sharding state with possible replica set",
+ "error"_attr = ex);
+ }
+ }
+
+ void onDroppedSet(const Key& key) noexcept final {}
+
+private:
+ // Schedules updates for replica set 'setName' on the config server. Loosly preserves ordering
+ // of update execution. Newer updates will not be overwritten by older updates in config.shards.
+ void _scheduleUpdateConfigServer(std::string setName) {
+ ConnectionString update;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (!_hasUpdateState(lock, setName)) {
+ return;
+ }
+ auto updateState = _updateStates.at(setName);
+ if (updateState->updateInProgress) {
+ return;
+ }
+ updateState->updateInProgress = true;
+ update = updateState->nextUpdateToSend.get();
+ updateState->nextUpdateToSend = boost::none;
+ }
+
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ auto schedStatus =
+ executor
+ ->scheduleWork([self = shared_from_this(), setName, update](auto args) {
+ self->_updateConfigServer(args.status, setName, update);
+ })
+ .getStatus();
+ if (ErrorCodes::isCancelationError(schedStatus.code())) {
+ LOGV2_DEBUG(22848,
+ 2,
+ "Unable to schedule updating sharding state with confirmed replica set due"
+ " to {error}",
+ "Unable to schedule updating sharding state with confirmed replica set",
+ "error"_attr = schedStatus);
+ return;
+ }
+ uassertStatusOK(schedStatus);
+ }
+
+ void _updateConfigServer(Status status, std::string setName, ConnectionString update) {
+ if (ErrorCodes::isCancelationError(status.code())) {
+ stdx::lock_guard lock(_mutex);
+ _updateStates.erase(setName);
+ return;
+ }
+
+ if (MONGO_unlikely(failReplicaSetChangeConfigServerUpdateHook.shouldFail())) {
+ _endUpdateConfigServer(setName, update);
+ return;
+ }
+
+ try {
+ LOGV2(22846,
+ "Updating sharding state with confirmed replica set",
+ "connectionString"_attr = update);
+ ShardRegistry::updateReplicaSetOnConfigServer(_serviceContext, update);
+ } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
+ LOGV2(22847,
+ "Unable to update sharding state with confirmed replica set",
+ "error"_attr = e);
+ } catch (...) {
+ _endUpdateConfigServer(setName, update);
+ throw;
+ }
+ _endUpdateConfigServer(setName, update);
+ }
+
+ void _endUpdateConfigServer(std::string setName, ConnectionString update) {
+ bool moreUpdates = false;
+ {
+ stdx::lock_guard lock(_mutex);
+ invariant(_hasUpdateState(lock, setName));
+ auto updateState = _updateStates.at(setName);
+ updateState->updateInProgress = false;
+ moreUpdates = (updateState->nextUpdateToSend != boost::none);
+ if (!moreUpdates) {
+ _updateStates.erase(setName);
+ }
+ }
+ if (moreUpdates) {
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ executor->schedule([self = shared_from_this(), setName](auto args) {
+ self->_scheduleUpdateConfigServer(setName);
+ });
+ }
+ }
+
+ // Returns true if a ReplSetConfigUpdateState exists for replica set setName.
+ bool _hasUpdateState(WithLock, std::string setName) {
+ return (_updateStates.find(setName) != _updateStates.end());
+ }
+
+ ServiceContext* _serviceContext;
+
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardingReplicaSetChangeListenerMongod::mutex");
+
+ struct ReplSetConfigUpdateState {
+ // True when an update to the config.shards is in progress.
+ bool updateInProgress = false;
+ boost::optional<ConnectionString> nextUpdateToSend;
+ };
+ stdx::unordered_map<std::string, std::shared_ptr<ReplSetConfigUpdateState>> _updateStates;
+};
+
+ExitCode runMongosServer(ServiceContext* serviceContext) {
+ ThreadClient tc("mongosMain", serviceContext);
+
+ logShardingVersionInfo(nullptr);
+
+ initWireSpec();
+
+ // Set up the periodic runner for background job execution
+ {
+ auto runner = makePeriodicRunner(serviceContext);
+ serviceContext->setPeriodicRunner(std::move(runner));
+ }
+
+ OCSPManager::get()->startThreadPool();
+
+ serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongos>(serviceContext));
+
+ auto tl =
+ transport::TransportLayerManager::createWithConfig(&serverGlobalParams, serviceContext);
+ auto res = tl->setup();
+ if (!res.isOK()) {
+ LOGV2_ERROR(22856,
+ "Error setting up listener: {error}",
+ "Error setting up listener",
+ "error"_attr = res);
+ return EXIT_NET_ERROR;
+ }
+ serviceContext->setTransportLayer(std::move(tl));
+
+ auto unshardedHookList = std::make_unique<rpc::EgressMetadataHookList>();
+ unshardedHookList->addHook(std::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
+ unshardedHookList->addHook(std::make_unique<rpc::ClientMetadataPropagationEgressHook>());
+ unshardedHookList->addHook(
+ std::make_unique<rpc::ShardingEgressMetadataHookForMongos>(serviceContext));
+ unshardedHookList->addHook(std::make_unique<rpc::CommittedOpTimeMetadataHook>(serviceContext));
+
+ // Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks
+ globalConnPool.addHook(new ShardingConnectionHook(std::move(unshardedHookList)));
+
+ auto shardedHookList = std::make_unique<rpc::EgressMetadataHookList>();
+ shardedHookList->addHook(std::make_unique<rpc::LogicalTimeMetadataHook>(serviceContext));
+ shardedHookList->addHook(std::make_unique<rpc::ClientMetadataPropagationEgressHook>());
+ shardedHookList->addHook(
+ std::make_unique<rpc::ShardingEgressMetadataHookForMongos>(serviceContext));
+ shardedHookList->addHook(std::make_unique<rpc::CommittedOpTimeMetadataHook>(serviceContext));
+
+ // Hook up a Listener for changes from the ReplicaSetMonitor
+ // This will last for the scope of this function. i.e. until shutdown finishes
+ auto shardingRSCL =
+ ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>(
+ serviceContext);
+
+ // Mongos connection pools already takes care of authenticating new connections so the
+ // replica set connection shouldn't need to.
+ DBClientReplicaSet::setAuthPooledSecondaryConn(false);
+
+ if (getHostName().empty()) {
+ quickExit(EXIT_BADOPTIONS);
+ }
+
+ LogicalClock::set(serviceContext, std::make_unique<LogicalClock>(serviceContext));
+
+ ReadWriteConcernDefaults::create(serviceContext, readWriteConcernDefaultsCacheLookupMongoS);
+
+ auto opCtxHolder = tc->makeOperationContext();
+ auto const opCtx = opCtxHolder.get();
+
+ {
+ Status status = initializeSharding(opCtx);
+ if (!status.isOK()) {
+ if (status == ErrorCodes::CallbackCanceled) {
+ invariant(globalInShutdownDeprecated());
+ LOGV2(22850, "Shutdown called before mongos finished starting up");
+ return EXIT_CLEAN;
+ }
+ LOGV2_ERROR(22857,
+ "Error initializing sharding system: {error}",
+ "Error initializing sharding system",
+ "error"_attr = status);
+ return EXIT_SHARDING_ERROR;
+ }
+
+ Grid::get(serviceContext)
+ ->getBalancerConfiguration()
+ ->refreshAndCheck(opCtx)
+ .transitional_ignore();
+
+ try {
+ ReadWriteConcernDefaults::get(serviceContext).refreshIfNecessary(opCtx);
+ } catch (const DBException& ex) {
+ LOGV2_WARNING(22855,
+ "Error loading read and write concern defaults at startup",
+ "error"_attr = redact(ex));
+ }
+ }
+
+ startMongoSFTDC();
+
+ if (mongosGlobalParams.scriptingEnabled) {
+ ScriptEngine::setup();
+ }
+
+ Status status = AuthorizationManager::get(serviceContext)->initialize(opCtx);
+ if (!status.isOK()) {
+ LOGV2_ERROR(22858,
+ "Error initializing authorization data: {error}",
+ "Error initializing authorization data",
+ "error"_attr = status);
+ return EXIT_SHARDING_ERROR;
+ }
+
+ // Construct the sharding uptime reporter after the startup parameters have been parsed in order
+ // to ensure that it picks up the server port instead of reporting the default value.
+ shardingUptimeReporter.emplace();
+ shardingUptimeReporter->startPeriodicThread();
+
+ clusterCursorCleanupJob.go();
+
+ UserCacheInvalidator cacheInvalidatorThread(AuthorizationManager::get(serviceContext));
+ cacheInvalidatorThread.initialize(opCtx);
+ cacheInvalidatorThread.go();
+
+ PeriodicTask::startRunningPeriodicTasks();
+
+ SessionKiller::set(serviceContext,
+ std::make_shared<SessionKiller>(serviceContext, killSessionsRemote));
+
+ LogicalSessionCache::set(
+ serviceContext,
+ std::make_unique<LogicalSessionCacheImpl>(std::make_unique<ServiceLiaisonMongos>(),
+ std::make_unique<SessionsCollectionSharded>(),
+ RouterSessionCatalog::reapSessionsOlderThan));
+
+ status = serviceContext->getServiceExecutor()->start();
+ if (!status.isOK()) {
+ LOGV2_ERROR(22859,
+ "Error starting service executor: {error}",
+ "Error starting service executor",
+ "error"_attr = redact(status));
+ return EXIT_NET_ERROR;
+ }
+
+ status = serviceContext->getServiceEntryPoint()->start();
+ if (!status.isOK()) {
+ LOGV2_ERROR(22860,
+ "Error starting service entry point: {error}",
+ "Error starting service entry point",
+ "error"_attr = redact(status));
+ return EXIT_NET_ERROR;
+ }
+
+ status = serviceContext->getTransportLayer()->start();
+ if (!status.isOK()) {
+ LOGV2_ERROR(22861,
+ "Error starting transport layer: {error}",
+ "Error starting transport layer",
+ "error"_attr = redact(status));
+ return EXIT_NET_ERROR;
+ }
+
+ serviceContext->notifyStartupComplete();
+
+#if !defined(_WIN32)
+ signalForkSuccess();
+#else
+ if (ntservice::shouldStartService()) {
+ ntservice::reportStatus(SERVICE_RUNNING);
+ LOGV2(22851, "Service running");
+ }
+#endif
+
+ // Block until shutdown.
+ MONGO_IDLE_THREAD_BLOCK;
+ return waitForShutdown();
+}
+
+#if defined(_WIN32)
+ExitCode initService() {
+ return runMongosServer(getGlobalServiceContext());
+}
+#endif
+
+/**
+ * This function should contain the startup "actions" that we take based on the startup config. It
+ * is intended to separate the actions from "storage" and "validation" of our startup configuration.
+ */
+void startupConfigActions(const std::vector<std::string>& argv) {
+#if defined(_WIN32)
+ std::vector<std::string> disallowedOptions;
+ disallowedOptions.push_back("upgrade");
+ ntservice::configureService(
+ initService, moe::startupOptionsParsed, defaultServiceStrings, disallowedOptions, argv);
+#endif
+}
+
+std::unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongos() {
+ return std::make_unique<AuthzManagerExternalStateMongos>();
+}
+
+ExitCode main(ServiceContext* serviceContext) {
+ serviceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds{10}));
+
+ auto const shardingContext = Grid::get(serviceContext);
+
+ // We either have a setting where all processes are in localhost or none are
+ std::vector<HostAndPort> configServers = mongosGlobalParams.configdbs.getServers();
+ for (std::vector<HostAndPort>::const_iterator it = configServers.begin();
+ it != configServers.end();
+ ++it) {
+ const HostAndPort& configAddr = *it;
+
+ if (it == configServers.begin()) {
+ shardingContext->setAllowLocalHost(configAddr.isLocalHost());
+ }
+
+ if (configAddr.isLocalHost() != shardingContext->allowLocalHost()) {
+ LOGV2_OPTIONS(22852,
+ {LogComponent::kDefault},
+ "cannot mix localhost and ip addresses in configdbs");
+ return EXIT_BADOPTIONS;
+ }
+ }
+
+#if defined(_WIN32)
+ if (ntservice::shouldStartService()) {
+ ntservice::startService();
+ // If we reach here, then we are not running as a service. Service installation exits
+ // directly and so never reaches here either.
+ }
+#endif
+
+ return runMongosServer(serviceContext);
+}
+
+MONGO_INITIALIZER_GENERAL(ForkServer, ("EndStartupOptionHandling"), ("default"))
+(InitializerContext* context) {
+ forkServerOrDie();
+ return Status::OK();
+}
+
+// Initialize the featureCompatibilityVersion server parameter since mongos does not have a
+// featureCompatibilityVersion document from which to initialize the parameter. The parameter is set
+// to the latest version because there is no feature gating that currently occurs at the mongos
+// level. The shards are responsible for rejecting usages of new features if their
+// featureCompatibilityVersion is lower.
+MONGO_INITIALIZER_WITH_PREREQUISITES(SetFeatureCompatibilityVersionLatest,
+ ("EndStartupOptionStorage"))
+(InitializerContext* context) {
+ serverGlobalParams.featureCompatibility.setVersion(
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo46);
+ return Status::OK();
+}
+
+#ifdef MONGO_CONFIG_SSL
+MONGO_INITIALIZER_GENERAL(setSSLManagerType, MONGO_NO_PREREQUISITES, ("SSLManager"))
+(InitializerContext* context) {
+ isSSLServer = true;
+ return Status::OK();
+}
+#endif
+
+} // namespace
+
+ExitCode mongos_main(int argc, char* argv[], char** envp) {
+ setMongos();
+
+ if (argc < 1)
+ return EXIT_BADOPTIONS;
+
+
+ setupSignalHandlers();
+
+ Status status = runGlobalInitializers(argc, argv, envp);
+ if (!status.isOK()) {
+ LOGV2_FATAL_OPTIONS(
+ 22865,
+ logv2::LogOptions(logv2::LogComponent::kDefault, logv2::FatalMode::kContinue),
+ "Error during global initialization: {error}",
+ "Error during global initialization",
+ "error"_attr = status);
+ return EXIT_ABRUPT;
+ }
+
+ try {
+ setGlobalServiceContext(ServiceContext::make());
+ } catch (...) {
+ auto cause = exceptionToStatus();
+ LOGV2_FATAL_OPTIONS(
+ 22866,
+ logv2::LogOptions(logv2::LogComponent::kDefault, logv2::FatalMode::kContinue),
+ "Error creating service context: {error}",
+ "Error creating service context",
+ "error"_attr = redact(cause));
+ return EXIT_ABRUPT;
+ }
+
+ registerShutdownTask(cleanupTask);
+
+ const auto service = getGlobalServiceContext();
+
+ ErrorExtraInfo::invariantHaveAllParsers();
+
+ startupConfigActions(std::vector<std::string>(argv, argv + argc));
+ cmdline_utils::censorArgvArray(argc, argv);
+
+ logCommonStartupWarnings(serverGlobalParams);
+
+ try {
+ if (!initializeServerGlobalState(service))
+ return EXIT_ABRUPT;
+
+ if (!initializeServerSecurityGlobalState(service))
+ quickExit(EXIT_FAILURE);
+
+ startSignalProcessingThread();
+
+ return main(service);
+ } catch (const DBException& e) {
+ LOGV2_ERROR(22862,
+ "uncaught DBException in mongos main: {error}",
+ "uncaught DBException in mongos main",
+ "error"_attr = redact(e));
+ return EXIT_UNCAUGHT;
+ } catch (const std::exception& e) {
+ LOGV2_ERROR(22863,
+ "uncaught std::exception in mongos main: {error}",
+ "uncaught std::exception in mongos main",
+ "error"_attr = redact(e.what()));
+ return EXIT_UNCAUGHT;
+ } catch (...) {
+ LOGV2_ERROR(22864, "uncaught unknown exception in mongos main");
+ return EXIT_UNCAUGHT;
+ }
+}
+
+} // namespace mongo