diff options
-rw-r--r-- | src/mongo/db/s/sharding_initialization_op_observer_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_futures_util.cpp | 51 | ||||
-rw-r--r-- | src/mongo/s/shard_server_test_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/shard_server_test_fixture.h | 3 |
4 files changed, 62 insertions, 7 deletions
diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp index 37f61b19653..8fe50592f40 100644 --- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp +++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp @@ -46,7 +46,7 @@ #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_catalog_cache_loader.h" -#include "mongo/s/shard_server_test_fixture.h" +#include "mongo/s/sharding_mongod_test_fixture.h" namespace mongo { namespace { @@ -55,13 +55,13 @@ const std::string kShardName("TestShard"); /** * This test suite validates that when the default OpObserver chain is set up (which happens to - * include the ShardServerOpObserver), writes to the 'admin.system.version' collection (and the + * include the ShardingMongodOpObserver), writes to the 'admin.system.version' collection (and the * shardIdentity document specifically) will invoke the sharding initialization code. */ -class ShardingInitializationOpObserverTest : public ShardServerTestFixture { +class ShardingInitializationOpObserverTest : public ShardingMongodTestFixture { public: void setUp() override { - ShardServerTestFixture::setUp(); + ShardingMongodTestFixture::setUp(); // NOTE: this assumes that globalInit will always be called on the same thread as the main // test thread @@ -77,7 +77,7 @@ public: void tearDown() override { ShardingState::get(getServiceContext())->clearForTests(); - ShardServerTestFixture::tearDown(); + ShardingMongodTestFixture::tearDown(); } int getInitCallCount() const { diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp index f072d8b330b..ad72139ccc3 100644 --- a/src/mongo/db/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/transaction_coordinator_futures_util.cpp @@ -36,6 +36,9 @@ #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/transport/service_entry_point.h" #include "mongo/util/log.h" namespace mongo { @@ -55,6 +58,50 @@ AsyncWorkScheduler::~AsyncWorkScheduler() = default; Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand( const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) { + + bool isSelfShard = [this, shardId] { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return shardId == ShardRegistry::kConfigServerShardId; + } + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + return shardId == ShardingState::get(_serviceContext)->shardId(); + } + MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path. + }(); + + if (isSelfShard) { + // If sending a command to the same shard as this node is in, send it directly to this node + // rather than going through the host targeting below. This ensures that the state changes + // for the participant and coordinator occur sequentially on a single branch of replica set + // history. See SERVER-38142 for details. + return scheduleWork([ shardId, + commandObj = commandObj.getOwned() ](OperationContext * opCtx) { + // Note: This internal authorization is tied to the lifetime of 'opCtx', which is + // destroyed by 'scheduleWork' immediately after this lambda ends. + AuthorizationSession::get(Client::getCurrent())->grantInternalAuthorization(); + + LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId; + + auto start = Date_t::now(); + + auto requestOpMsg = + OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize(); + const auto replyOpMsg = OpMsg::parseOwned(opCtx->getServiceContext() + ->getServiceEntryPoint() + ->handleRequest(opCtx, requestOpMsg) + .response); + + // Document sequences are not yet being used for responses. + invariant(replyOpMsg.sequences.empty()); + + // 'ResponseStatus' is the response format of a remote request sent over the network, so + // we simulate that format manually here, since we sent the request over the loopback. + return ResponseStatus{replyOpMsg.body.getOwned(), Date_t::now() - start}; + }); + } + + // Manually simulate a futures interface to the TaskExecutor by creating this promise-future + // pair and setting the promise from inside the callback passed to the TaskExecutor. auto promiseAndFuture = makePromiseFuture<ResponseStatus>(); auto sharedPromise = std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise)); @@ -93,8 +140,8 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot }) .getAsync([](Status) {}); - // Do not wait for the callback to run. The callback will reschedule the remote request on - // the same executor if necessary. + // Do not wait for the callback to run. The continuation on the future corresponding to + // 'sharedPromise' will reschedule the remote request if necessary. return std::move(promiseAndFuture.future); } diff --git a/src/mongo/s/shard_server_test_fixture.cpp b/src/mongo/s/shard_server_test_fixture.cpp index e182177760d..ea1148167d6 100644 --- a/src/mongo/s/shard_server_test_fixture.cpp +++ b/src/mongo/s/shard_server_test_fixture.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/dist_lock_catalog_mock.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" @@ -61,11 +62,15 @@ std::shared_ptr<RemoteCommandTargeterMock> ShardServerTestFixture::configTargete void ShardServerTestFixture::setUp() { ShardingMongodTestFixture::setUp(); + replicationCoordinator()->alwaysAllowWrites(true); // Initialize sharding components as a shard server. serverGlobalParams.clusterRole = ClusterRole::ShardServer; + _clusterId = OID::gen(); + ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId); + CatalogCacheLoader::set(getServiceContext(), stdx::make_unique<ShardServerCatalogCacheLoader>( stdx::make_unique<ConfigServerCatalogCacheLoader>())); diff --git a/src/mongo/s/shard_server_test_fixture.h b/src/mongo/s/shard_server_test_fixture.h index f78744c90a4..979e287dfbf 100644 --- a/src/mongo/s/shard_server_test_fixture.h +++ b/src/mongo/s/shard_server_test_fixture.h @@ -79,6 +79,9 @@ protected: */ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( std::unique_ptr<DistLockManager> distLockManager) override; + + const ShardId _myShardName{"myShardName"}; + OID _clusterId; }; } // namespace mongo |