diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-12 11:22:34 +0200 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-13 18:58:05 +0200 |
commit | aa89fef4ac12249077ff8701b465d0b9f733fd2c (patch) | |
tree | 9aa39aeb38760f3d6f9c4b4967f0fc437bd7d518 /src/mongo/s | |
parent | 89b1e783fa3d5aac034b5af51ef6c1732bef5310 (diff) | |
download | mongo-aa89fef4ac12249077ff8701b465d0b9f733fd2c.tar.gz |
SERVER-37349 Ensure that commands with transactions do not use the VersionManager
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/client/parallel.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/client/parallel.h | 2 | ||||
-rw-r--r-- | src/mongo/s/client/shard_connection.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/client/shard_connection.h | 3 | ||||
-rw-r--r-- | src/mongo/s/client/shard_connection_test.cpp | 53 | ||||
-rw-r--r-- | src/mongo/s/client/version_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_reset_error_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 16 |
10 files changed, 65 insertions, 38 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index ebe11e262d9..45c8c91d881 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -46,6 +46,7 @@ env.Library( '$BUILD_DIR/mongo/db/lasterror', 'cluster_last_error_info', 'grid', + 'sharding_router_api', 'sharding_routing_table', ], ) diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp index a9f2c60eee7..ba678d549c0 100644 --- a/src/mongo/s/client/parallel.cpp +++ b/src/mongo/s/client/parallel.cpp @@ -218,7 +218,7 @@ void ParallelSortClusteredCursor::init(OperationContext* opCtx) { } else { // You can only get here by using the legacy constructor // TODO: Eliminate this - _oldInit(); + _oldInit(opCtx); } } @@ -351,7 +351,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( if (!state->conn) { const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - state->conn.reset(new ShardConnection(shard->getConnString(), ns.ns(), manager)); + state->conn.reset(new ShardConnection(opCtx, shard->getConnString(), ns.ns(), manager)); } const DBClientBase* rawConn = state->conn->getRawConn(); @@ -911,7 +911,7 @@ std::shared_ptr<DBClientCursor> ParallelSortClusteredCursor::getShardCursor( } // DEPRECATED (but still used by map/reduce) -void ParallelSortClusteredCursor::_oldInit() { +void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) { // make sure we're not already initialized verify(!_cursors); _cursors = new DBClientCursorHolder[_numServers]; @@ -970,7 +970,7 @@ void ParallelSortClusteredCursor::_oldInit() { // here try { conns.push_back(shared_ptr<ShardConnection>(new ShardConnection( - uassertStatusOK(ConnectionString::parse(serverHost)), _ns))); + opCtx, uassertStatusOK(ConnectionString::parse(serverHost)), _ns))); } catch (std::exception& e) { socketExs.push_back(e.what() + errLoc); if (!returnPartial) { diff --git a/src/mongo/s/client/parallel.h b/src/mongo/s/client/parallel.h index 4e740c93c39..ccc11d375a5 100644 --- a/src/mongo/s/client/parallel.h +++ b/src/mongo/s/client/parallel.h @@ -155,7 +155,7 @@ private: std::shared_ptr<ChunkManager> manager /* in */); // LEGACY init - Needed for map reduce - void _oldInit(); + void _oldInit(OperationContext* opCtx); // LEGACY - Needed ONLY for _oldInit std::string _ns; diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index 9d117a706af..98faaee1bbf 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -44,6 +44,7 @@ #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" +#include "mongo/s/transaction_router.h" #include "mongo/util/concurrency/spin_lock.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -375,23 +376,28 @@ MONGO_INITIALIZER(InitializeShardedConnectionPool)(InitializerContext* context) DBConnectionPool shardConnectionPool; -ShardConnection::ShardConnection(const ConnectionString& connectionString, +ShardConnection::ShardConnection(OperationContext* opCtx, + const ConnectionString& connectionString, const std::string& ns, std::shared_ptr<ChunkManager> manager) : _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) { invariant(_cs.isValid()); + // This code should never run under a cross-shard transaction + invariant(!TransactionRouter::get(opCtx)); + // Make sure we specified a manager for the correct namespace if (_ns.size() && _manager) { invariant(_manager->getns().ns() == _ns); } auto csString = _cs.toString(); + _conn = ClientConnections::threadInstance()->get(csString, _ns); if (isMongos()) { // In mongos, we record this connection as having been used for useful work to provide // useful information in getLastError. - ClusterLastErrorInfo::get(cc())->addShardHost(csString); + ClusterLastErrorInfo::get(opCtx->getClient())->addShardHost(csString); } } diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h index bfe3237e434..ac869a779e6 100644 --- a/src/mongo/s/client/shard_connection.h +++ b/src/mongo/s/client/shard_connection.h @@ -48,7 +48,8 @@ public: * setShardVersion command will be invoked to initialize the remote shard. Otherwise, the * chunk manager will be used to obtain the shard version to set on the connection. */ - ShardConnection(const ConnectionString& connectionString, + ShardConnection(OperationContext* opCtx, + const ConnectionString& connectionString, const std::string& ns, std::shared_ptr<ChunkManager> manager = nullptr); diff --git a/src/mongo/s/client/shard_connection_test.cpp b/src/mongo/s/client/shard_connection_test.cpp index 1fa07c1dc2f..af5fb706346 100644 --- a/src/mongo/s/client/shard_connection_test.cpp +++ b/src/mongo/s/client/shard_connection_test.cpp @@ -111,7 +111,7 @@ protected: std::vector<std::unique_ptr<ShardConnection>> newConnList; for (size_t x = 0; x < newConnsToCreate; x++) { auto newConn = std::make_unique<ShardConnection>( - ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + _opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); checkFunc(newConn->get()->getSockCreationMicroSec(), arg2); newConnList.emplace_back(std::move(newConn)); } @@ -127,7 +127,7 @@ protected: // Check that connections created after the purge was put back to the pool. for (size_t x = 0; x < newConnsToCreate; x++) { auto newConn = std::make_unique<ShardConnection>( - ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + _opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); ASSERT_LESS_THAN(newConn->get()->getSockCreationMicroSec(), oldCreationTime); newConnList.emplace_back(std::move(newConn)); } @@ -137,19 +137,22 @@ protected: } } + const ServiceContext::UniqueOperationContext _uniqueOpCtx = makeOperationContext(); + OperationContext* const _opCtx = _uniqueOpCtx.get(); + private: MockRemoteDBServer* _dummyServer; uint32_t _maxPoolSizePerHost; }; TEST_F(ShardConnFixture, BasicShardConnection) { - ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); DBClientBase* conn1Ptr = conn1.get(); conn1.done(); - ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); ASSERT_EQUALS(conn1Ptr, conn3.get()); conn2.done(); @@ -157,9 +160,9 @@ TEST_F(ShardConnFixture, BasicShardConnection) { } TEST_F(ShardConnFixture, InvalidateBadConnInPool) { - ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); conn1.done(); conn3.done(); @@ -179,9 +182,9 @@ TEST_F(ShardConnFixture, InvalidateBadConnInPool) { } TEST_F(ShardConnFixture, DontReturnKnownBadConnToPool) { - ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); conn1.done(); killServer(); @@ -202,9 +205,9 @@ TEST_F(ShardConnFixture, DontReturnKnownBadConnToPool) { } TEST_F(ShardConnFixture, BadConnClearsPoolWhenKilled) { - ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); conn1.done(); killServer(); @@ -225,9 +228,9 @@ TEST_F(ShardConnFixture, BadConnClearsPoolWhenKilled) { } TEST_F(ShardConnFixture, KilledGoodConnShouldNotClearPool) { - ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); const uint64_t upperBoundCreationTime = conn3.get()->getSockCreationMicroSec(); conn3.done(); @@ -237,8 +240,8 @@ TEST_F(ShardConnFixture, KilledGoodConnShouldNotClearPool) { conn2.done(); - ShardConnection conn4(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn5(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn4(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn5(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); ASSERT_GREATER_THAN(conn4.get()->getSockCreationMicroSec(), badCreationTime); ASSERT_LESS_THAN_OR_EQUALS(conn4.get()->getSockCreationMicroSec(), upperBoundCreationTime); @@ -252,9 +255,9 @@ TEST_F(ShardConnFixture, KilledGoodConnShouldNotClearPool) { TEST_F(ShardConnFixture, InvalidateBadConnEvenWhenPoolIsFull) { mongo::shardConnectionPool.setMaxPoolSize(2); - ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); - ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); conn1.done(); conn3.done(); @@ -274,13 +277,13 @@ TEST_F(ShardConnFixture, InvalidateBadConnEvenWhenPoolIsFull) { } TEST_F(ShardConnFixture, DontReturnConnGoneBadToPool) { - ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); const uint64_t conn1CreationTime = conn1.get()->getSockCreationMicroSec(); uint64_t conn2CreationTime = 0; { - ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); conn2CreationTime = conn2.get()->getSockCreationMicroSec(); conn1.done(); @@ -291,7 +294,7 @@ TEST_F(ShardConnFixture, DontReturnConnGoneBadToPool) { // also not invalidate older connections since it didn't encounter // a socket exception. - ShardConnection conn1Again(ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); + ShardConnection conn1Again(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user"); ASSERT_EQUALS(conn1CreationTime, conn1Again.get()->getSockCreationMicroSec()); checkNewConns(assertNotEqual, conn2CreationTime, 10); diff --git a/src/mongo/s/client/version_manager.cpp b/src/mongo/s/client/version_manager.cpp index 0d9e4ee897f..abb401cea59 100644 --- a/src/mongo/s/client/version_manager.cpp +++ b/src/mongo/s/client/version_manager.cpp @@ -42,6 +42,7 @@ #include "mongo/s/is_mongos.h" #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/stale_exception.h" +#include "mongo/s/transaction_router.h" #include "mongo/util/log.h" namespace mongo { @@ -113,6 +114,9 @@ bool setShardVersion(OperationContext* opCtx, ChunkManager* manager, bool authoritative, BSONObj& result) { + // This code should never run under a cross-shard transaction + invariant(!TransactionRouter::get(opCtx)); + ShardId shardId; ConnectionString shardCS; { diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 72d6f6971d7..3e2321350ce 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -287,7 +287,7 @@ public: invariant(inputRoutingInfo.db().primary()); - ShardConnection conn(inputRoutingInfo.db().primary()->getConnString(), ""); + ShardConnection conn(opCtx, inputRoutingInfo.db().primary()->getConnString(), ""); BSONObj res; bool ok = conn->runCommand( @@ -449,7 +449,7 @@ public: const auto outputShard = uassertStatusOK(shardRegistry->getShard(opCtx, outputDbInfo.primaryId())); - ShardConnection conn(outputShard->getConnString(), outputCollNss.ns()); + ShardConnection conn(opCtx, outputShard->getConnString(), outputCollNss.ns()); ok = conn->runCommand( outDB, appendAllowImplicitCreate(finalCmd.obj(), true), singleResult); diff --git a/src/mongo/s/commands/cluster_reset_error_cmd.cpp b/src/mongo/s/commands/cluster_reset_error_cmd.cpp index 11da21626da..f4a0329f9fc 100644 --- a/src/mongo/s/commands/cluster_reset_error_cmd.cpp +++ b/src/mongo/s/commands/cluster_reset_error_cmd.cpp @@ -70,7 +70,7 @@ public: for (std::set<std::string>::const_iterator i = shards->begin(); i != shards->end(); i++) { const std::string shardName = *i; - ShardConnection conn(ConnectionString(shardName, ConnectionString::SET), ""); + ShardConnection conn(opCtx, ConnectionString(shardName, ConnectionString::SET), ""); BSONObj res; diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 4ff6bdfe3e9..09defb7c760 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -430,7 +430,13 @@ void runCommand(OperationContext* opCtx, // Send setShardVersion on this thread's versioned connections to shards (to support // commands that use the legacy (ShardConnection) versioning protocol). - if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) { + // + // Versioned connections are a legacy concept, which is never used from code running + // under a transaction (see the invariant inside ShardConnection). Because of this, + // the retargeting error could not have come from a ShardConnection, so we don't + // need to reset the connection's in-memory state. + if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError) && + !TransactionRouter::get(opCtx)) { ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); } @@ -858,7 +864,13 @@ void Strategy::explainFind(OperationContext* opCtx, // Send setShardVersion on this thread's versioned connections to shards (to support // commands that use the legacy (ShardConnection) versioning protocol). - if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError)) { + // + // Versioned connections are a legacy concept, which is never used from code running + // under a transaction (see the invariant inside ShardConnection). Because of this, the + // retargeting error could not have come from a ShardConnection, so we don't need to + // reset the connection's in-memory state. + if (!MONGO_FAIL_POINT(doNotRefreshShardsOnRetargettingError) && + !TransactionRouter::get(opCtx)) { ShardConnection::checkMyConnectionVersions(opCtx, staleNs.ns()); } |