summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2019-01-07 23:15:19 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2019-01-16 14:49:54 -0500
commitbd1bcfbaad651070eba163a8714762ae7ed60906 (patch)
tree2ee2b3b4d007139e89cf8b3156f93b677a766757
parent9c49d721526ac83ada34950841ceef5b0b48c3c5 (diff)
downloadmongo-bd1bcfbaad651070eba163a8714762ae7ed60906.tar.gz
SERVER-38412 Transaction coordinator should send prepare, commit, and abort directly to its local participant
-rw-r--r--src/mongo/db/s/sharding_initialization_op_observer_test.cpp10
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util.cpp51
-rw-r--r--src/mongo/s/shard_server_test_fixture.cpp5
-rw-r--r--src/mongo/s/shard_server_test_fixture.h3
4 files changed, 62 insertions, 7 deletions
diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
index 37f61b19653..8fe50592f40 100644
--- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
+++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
@@ -46,7 +46,7 @@
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
-#include "mongo/s/shard_server_test_fixture.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
namespace mongo {
namespace {
@@ -55,13 +55,13 @@ const std::string kShardName("TestShard");
/**
* This test suite validates that when the default OpObserver chain is set up (which happens to
- * include the ShardServerOpObserver), writes to the 'admin.system.version' collection (and the
+ * include the ShardingMongodOpObserver), writes to the 'admin.system.version' collection (and the
* shardIdentity document specifically) will invoke the sharding initialization code.
*/
-class ShardingInitializationOpObserverTest : public ShardServerTestFixture {
+class ShardingInitializationOpObserverTest : public ShardingMongodTestFixture {
public:
void setUp() override {
- ShardServerTestFixture::setUp();
+ ShardingMongodTestFixture::setUp();
// NOTE: this assumes that globalInit will always be called on the same thread as the main
// test thread
@@ -77,7 +77,7 @@ public:
void tearDown() override {
ShardingState::get(getServiceContext())->clearForTests();
- ShardServerTestFixture::tearDown();
+ ShardingMongodTestFixture::tearDown();
}
int getInitCallCount() const {
diff --git a/src/mongo/db/transaction_coordinator_futures_util.cpp b/src/mongo/db/transaction_coordinator_futures_util.cpp
index f072d8b330b..ad72139ccc3 100644
--- a/src/mongo/db/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util.cpp
@@ -36,6 +36,9 @@
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/transport/service_entry_point.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -55,6 +58,50 @@ AsyncWorkScheduler::~AsyncWorkScheduler() = default;
Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand(
const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) {
+
+ bool isSelfShard = [this, shardId] {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return shardId == ShardRegistry::kConfigServerShardId;
+ }
+ if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ return shardId == ShardingState::get(_serviceContext)->shardId();
+ }
+ MONGO_UNREACHABLE; // Only sharded systems should use the two-phase commit path.
+ }();
+
+ if (isSelfShard) {
+ // If sending a command to the same shard as this node is in, send it directly to this node
+ // rather than going through the host targeting below. This ensures that the state changes
+ // for the participant and coordinator occur sequentially on a single branch of replica set
+ // history. See SERVER-38142 for details.
+ return scheduleWork([ shardId,
+ commandObj = commandObj.getOwned() ](OperationContext * opCtx) {
+ // Note: This internal authorization is tied to the lifetime of 'opCtx', which is
+ // destroyed by 'scheduleWork' immediately after this lambda ends.
+ AuthorizationSession::get(Client::getCurrent())->grantInternalAuthorization();
+
+ LOG(3) << "Coordinator going to send command " << commandObj << " to shard " << shardId;
+
+ auto start = Date_t::now();
+
+ auto requestOpMsg =
+ OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize();
+ const auto replyOpMsg = OpMsg::parseOwned(opCtx->getServiceContext()
+ ->getServiceEntryPoint()
+ ->handleRequest(opCtx, requestOpMsg)
+ .response);
+
+ // Document sequences are not yet being used for responses.
+ invariant(replyOpMsg.sequences.empty());
+
+ // 'ResponseStatus' is the response format of a remote request sent over the network, so
+ // we simulate that format manually here, since we sent the request over the loopback.
+ return ResponseStatus{replyOpMsg.body.getOwned(), Date_t::now() - start};
+ });
+ }
+
+ // Manually simulate a futures interface to the TaskExecutor by creating this promise-future
+ // pair and setting the promise from inside the callback passed to the TaskExecutor.
auto promiseAndFuture = makePromiseFuture<ResponseStatus>();
auto sharedPromise =
std::make_shared<Promise<ResponseStatus>>(std::move(promiseAndFuture.promise));
@@ -93,8 +140,8 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
})
.getAsync([](Status) {});
- // Do not wait for the callback to run. The callback will reschedule the remote request on
- // the same executor if necessary.
+ // Do not wait for the callback to run. The continuation on the future corresponding to
+ // 'sharedPromise' will reschedule the remote request if necessary.
return std::move(promiseAndFuture.future);
}
diff --git a/src/mongo/s/shard_server_test_fixture.cpp b/src/mongo/s/shard_server_test_fixture.cpp
index e182177760d..ea1148167d6 100644
--- a/src/mongo/s/shard_server_test_fixture.cpp
+++ b/src/mongo/s/shard_server_test_fixture.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/shard_server_catalog_cache_loader.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/dist_lock_catalog_mock.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
@@ -61,11 +62,15 @@ std::shared_ptr<RemoteCommandTargeterMock> ShardServerTestFixture::configTargete
void ShardServerTestFixture::setUp() {
ShardingMongodTestFixture::setUp();
+
replicationCoordinator()->alwaysAllowWrites(true);
// Initialize sharding components as a shard server.
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ _clusterId = OID::gen();
+ ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId);
+
CatalogCacheLoader::set(getServiceContext(),
stdx::make_unique<ShardServerCatalogCacheLoader>(
stdx::make_unique<ConfigServerCatalogCacheLoader>()));
diff --git a/src/mongo/s/shard_server_test_fixture.h b/src/mongo/s/shard_server_test_fixture.h
index f78744c90a4..979e287dfbf 100644
--- a/src/mongo/s/shard_server_test_fixture.h
+++ b/src/mongo/s/shard_server_test_fixture.h
@@ -79,6 +79,9 @@ protected:
*/
std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
std::unique_ptr<DistLockManager> distLockManager) override;
+
+ const ShardId _myShardName{"myShardName"};
+ OID _clusterId;
};
} // namespace mongo