summaryrefslogtreecommitdiff
path: root/src/mongo/s/client
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-12 11:22:34 +0200
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-13 18:58:05 +0200
commitaa89fef4ac12249077ff8701b465d0b9f733fd2c (patch)
tree9aa39aeb38760f3d6f9c4b4967f0fc437bd7d518 /src/mongo/s/client
parent89b1e783fa3d5aac034b5af51ef6c1732bef5310 (diff)
downloadmongo-aa89fef4ac12249077ff8701b465d0b9f733fd2c.tar.gz
SERVER-37349 Ensure that commands with transactions do not use the VersionManager
Diffstat (limited to 'src/mongo/s/client')
-rw-r--r--src/mongo/s/client/parallel.cpp8
-rw-r--r--src/mongo/s/client/parallel.h2
-rw-r--r--src/mongo/s/client/shard_connection.cpp10
-rw-r--r--src/mongo/s/client/shard_connection.h3
-rw-r--r--src/mongo/s/client/shard_connection_test.cpp53
-rw-r--r--src/mongo/s/client/version_manager.cpp4
6 files changed, 47 insertions, 33 deletions
diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp
index a9f2c60eee7..ba678d549c0 100644
--- a/src/mongo/s/client/parallel.cpp
+++ b/src/mongo/s/client/parallel.cpp
@@ -218,7 +218,7 @@ void ParallelSortClusteredCursor::init(OperationContext* opCtx) {
} else {
// You can only get here by using the legacy constructor
// TODO: Eliminate this
- _oldInit();
+ _oldInit(opCtx);
}
}
@@ -351,7 +351,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
if (!state->conn) {
const auto shard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- state->conn.reset(new ShardConnection(shard->getConnString(), ns.ns(), manager));
+ state->conn.reset(new ShardConnection(opCtx, shard->getConnString(), ns.ns(), manager));
}
const DBClientBase* rawConn = state->conn->getRawConn();
@@ -911,7 +911,7 @@ std::shared_ptr<DBClientCursor> ParallelSortClusteredCursor::getShardCursor(
}
// DEPRECATED (but still used by map/reduce)
-void ParallelSortClusteredCursor::_oldInit() {
+void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) {
// make sure we're not already initialized
verify(!_cursors);
_cursors = new DBClientCursorHolder[_numServers];
@@ -970,7 +970,7 @@ void ParallelSortClusteredCursor::_oldInit() {
// here
try {
conns.push_back(shared_ptr<ShardConnection>(new ShardConnection(
- uassertStatusOK(ConnectionString::parse(serverHost)), _ns)));
+ opCtx, uassertStatusOK(ConnectionString::parse(serverHost)), _ns)));
} catch (std::exception& e) {
socketExs.push_back(e.what() + errLoc);
if (!returnPartial) {
diff --git a/src/mongo/s/client/parallel.h b/src/mongo/s/client/parallel.h
index 4e740c93c39..ccc11d375a5 100644
--- a/src/mongo/s/client/parallel.h
+++ b/src/mongo/s/client/parallel.h
@@ -155,7 +155,7 @@ private:
std::shared_ptr<ChunkManager> manager /* in */);
// LEGACY init - Needed for map reduce
- void _oldInit();
+ void _oldInit(OperationContext* opCtx);
// LEGACY - Needed ONLY for _oldInit
std::string _ns;
diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp
index 9d117a706af..98faaee1bbf 100644
--- a/src/mongo/s/client/shard_connection.cpp
+++ b/src/mongo/s/client/shard_connection.cpp
@@ -44,6 +44,7 @@
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
+#include "mongo/s/transaction_router.h"
#include "mongo/util/concurrency/spin_lock.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
@@ -375,23 +376,28 @@ MONGO_INITIALIZER(InitializeShardedConnectionPool)(InitializerContext* context)
DBConnectionPool shardConnectionPool;
-ShardConnection::ShardConnection(const ConnectionString& connectionString,
+ShardConnection::ShardConnection(OperationContext* opCtx,
+ const ConnectionString& connectionString,
const std::string& ns,
std::shared_ptr<ChunkManager> manager)
: _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) {
invariant(_cs.isValid());
+ // This code should never run under a cross-shard transaction
+ invariant(!TransactionRouter::get(opCtx));
+
// Make sure we specified a manager for the correct namespace
if (_ns.size() && _manager) {
invariant(_manager->getns().ns() == _ns);
}
auto csString = _cs.toString();
+
_conn = ClientConnections::threadInstance()->get(csString, _ns);
if (isMongos()) {
// In mongos, we record this connection as having been used for useful work to provide
// useful information in getLastError.
- ClusterLastErrorInfo::get(cc())->addShardHost(csString);
+ ClusterLastErrorInfo::get(opCtx->getClient())->addShardHost(csString);
}
}
diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h
index bfe3237e434..ac869a779e6 100644
--- a/src/mongo/s/client/shard_connection.h
+++ b/src/mongo/s/client/shard_connection.h
@@ -48,7 +48,8 @@ public:
* setShardVersion command will be invoked to initialize the remote shard. Otherwise, the
* chunk manager will be used to obtain the shard version to set on the connection.
*/
- ShardConnection(const ConnectionString& connectionString,
+ ShardConnection(OperationContext* opCtx,
+ const ConnectionString& connectionString,
const std::string& ns,
std::shared_ptr<ChunkManager> manager = nullptr);
diff --git a/src/mongo/s/client/shard_connection_test.cpp b/src/mongo/s/client/shard_connection_test.cpp
index 1fa07c1dc2f..af5fb706346 100644
--- a/src/mongo/s/client/shard_connection_test.cpp
+++ b/src/mongo/s/client/shard_connection_test.cpp
@@ -111,7 +111,7 @@ protected:
std::vector<std::unique_ptr<ShardConnection>> newConnList;
for (size_t x = 0; x < newConnsToCreate; x++) {
auto newConn = std::make_unique<ShardConnection>(
- ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ _opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
checkFunc(newConn->get()->getSockCreationMicroSec(), arg2);
newConnList.emplace_back(std::move(newConn));
}
@@ -127,7 +127,7 @@ protected:
// Check that connections created after the purge was put back to the pool.
for (size_t x = 0; x < newConnsToCreate; x++) {
auto newConn = std::make_unique<ShardConnection>(
- ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ _opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
ASSERT_LESS_THAN(newConn->get()->getSockCreationMicroSec(), oldCreationTime);
newConnList.emplace_back(std::move(newConn));
}
@@ -137,19 +137,22 @@ protected:
}
}
+ const ServiceContext::UniqueOperationContext _uniqueOpCtx = makeOperationContext();
+ OperationContext* const _opCtx = _uniqueOpCtx.get();
+
private:
MockRemoteDBServer* _dummyServer;
uint32_t _maxPoolSizePerHost;
};
TEST_F(ShardConnFixture, BasicShardConnection) {
- ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
DBClientBase* conn1Ptr = conn1.get();
conn1.done();
- ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
ASSERT_EQUALS(conn1Ptr, conn3.get());
conn2.done();
@@ -157,9 +160,9 @@ TEST_F(ShardConnFixture, BasicShardConnection) {
}
TEST_F(ShardConnFixture, InvalidateBadConnInPool) {
- ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
conn1.done();
conn3.done();
@@ -179,9 +182,9 @@ TEST_F(ShardConnFixture, InvalidateBadConnInPool) {
}
TEST_F(ShardConnFixture, DontReturnKnownBadConnToPool) {
- ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
conn1.done();
killServer();
@@ -202,9 +205,9 @@ TEST_F(ShardConnFixture, DontReturnKnownBadConnToPool) {
}
TEST_F(ShardConnFixture, BadConnClearsPoolWhenKilled) {
- ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
conn1.done();
killServer();
@@ -225,9 +228,9 @@ TEST_F(ShardConnFixture, BadConnClearsPoolWhenKilled) {
}
TEST_F(ShardConnFixture, KilledGoodConnShouldNotClearPool) {
- ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
const uint64_t upperBoundCreationTime = conn3.get()->getSockCreationMicroSec();
conn3.done();
@@ -237,8 +240,8 @@ TEST_F(ShardConnFixture, KilledGoodConnShouldNotClearPool) {
conn2.done();
- ShardConnection conn4(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn5(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn4(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn5(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
ASSERT_GREATER_THAN(conn4.get()->getSockCreationMicroSec(), badCreationTime);
ASSERT_LESS_THAN_OR_EQUALS(conn4.get()->getSockCreationMicroSec(), upperBoundCreationTime);
@@ -252,9 +255,9 @@ TEST_F(ShardConnFixture, KilledGoodConnShouldNotClearPool) {
TEST_F(ShardConnFixture, InvalidateBadConnEvenWhenPoolIsFull) {
mongo::shardConnectionPool.setMaxPoolSize(2);
- ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
- ShardConnection conn3(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn3(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
conn1.done();
conn3.done();
@@ -274,13 +277,13 @@ TEST_F(ShardConnFixture, InvalidateBadConnEvenWhenPoolIsFull) {
}
TEST_F(ShardConnFixture, DontReturnConnGoneBadToPool) {
- ShardConnection conn1(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
const uint64_t conn1CreationTime = conn1.get()->getSockCreationMicroSec();
uint64_t conn2CreationTime = 0;
{
- ShardConnection conn2(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn2(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
conn2CreationTime = conn2.get()->getSockCreationMicroSec();
conn1.done();
@@ -291,7 +294,7 @@ TEST_F(ShardConnFixture, DontReturnConnGoneBadToPool) {
// also not invalidate older connections since it didn't encounter
// a socket exception.
- ShardConnection conn1Again(ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
+ ShardConnection conn1Again(_opCtx, ConnectionString(HostAndPort(TARGET_HOST)), "test.user");
ASSERT_EQUALS(conn1CreationTime, conn1Again.get()->getSockCreationMicroSec());
checkNewConns(assertNotEqual, conn2CreationTime, 10);
diff --git a/src/mongo/s/client/version_manager.cpp b/src/mongo/s/client/version_manager.cpp
index 0d9e4ee897f..abb401cea59 100644
--- a/src/mongo/s/client/version_manager.cpp
+++ b/src/mongo/s/client/version_manager.cpp
@@ -42,6 +42,7 @@
#include "mongo/s/is_mongos.h"
#include "mongo/s/request_types/set_shard_version_request.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/s/transaction_router.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -113,6 +114,9 @@ bool setShardVersion(OperationContext* opCtx,
ChunkManager* manager,
bool authoritative,
BSONObj& result) {
+ // This code should never run under a cross-shard transaction
+ invariant(!TransactionRouter::get(opCtx));
+
ShardId shardId;
ConnectionString shardCS;
{