summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-07-14 18:08:54 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-07-21 14:39:26 -0400
commit1bfa39c1d841e20cda3e198f57adf4eca0ebb653 (patch)
tree785e9f2db674afeb319b4a09bd670563d18ca471
parent923fcee186e7f1f4a908c9a8ff62fb30c23d5dc5 (diff)
downloadmongo-1bfa39c1d841e20cda3e198f57adf4eca0ebb653.tar.gz
SERVER-19395 Write commands should use ShardConnection
With this change all legacy-style (non-NetworkInterface) communication from mongos to shard mongod is going through ShardConnection and will always end the shard initialization data at connection establishment.
-rw-r--r--jstests/sharding/write_commands_sharding_state.js79
-rw-r--r--src/mongo/client/scoped_db_conn_test.cpp4
-rw-r--r--src/mongo/dbtests/SConscript2
-rw-r--r--src/mongo/dbtests/chunk_manager_tests.cpp207
-rw-r--r--src/mongo/dbtests/config_server_fixture.cpp5
-rw-r--r--src/mongo/dbtests/config_server_fixture.h4
-rw-r--r--src/mongo/dbtests/config_upgrade_tests.cpp7
-rw-r--r--src/mongo/dbtests/merge_chunk_tests.cpp5
-rw-r--r--src/mongo/dbtests/sharding.cpp259
-rw-r--r--src/mongo/s/client/dbclient_multi_command.cpp110
-rw-r--r--src/mongo/s/client/dbclient_multi_command.h14
-rw-r--r--src/mongo/s/client/shard_connection.cpp23
-rw-r--r--src/mongo/s/client/shard_connection.h4
-rw-r--r--src/mongo/s/commands/commands_public.cpp1
-rw-r--r--src/mongo/s/server.cpp8
-rw-r--r--src/mongo/s/version_manager.cpp6
-rw-r--r--src/mongo/shell/shardingtest.js27
17 files changed, 389 insertions, 376 deletions
diff --git a/jstests/sharding/write_commands_sharding_state.js b/jstests/sharding/write_commands_sharding_state.js
new file mode 100644
index 00000000000..99fbe92ee42
--- /dev/null
+++ b/jstests/sharding/write_commands_sharding_state.js
@@ -0,0 +1,79 @@
+(function() {
+
+'use strict';
+
+var st = new ShardingTest({name: "write_commands", mongos: 2, shards: 2 });
+st.stopBalancer();
+
+var dbTestName = 'WriteCommandsTestDB';
+
+assert.commandWorked(st.s0.adminCommand({ enablesharding: dbTestName }));
+st.ensurePrimaryShard(dbTestName, 'shard0000');
+
+assert.commandWorked(st.s0.adminCommand({ shardCollection: dbTestName + '.TestColl',
+ key: { Key: 1 },
+ unique: true }));
+
+// Split at keys 10 and 20
+assert.commandWorked(st.s0.adminCommand({ split: dbTestName + '.TestColl', middle: { Key: 10 } }));
+assert.commandWorked(st.s0.adminCommand({ split: dbTestName + '.TestColl', middle: { Key: 20 } }));
+
+printjson(st.config.getSiblingDB('config').chunks.find().toArray());
+
+// Move < 10 to shard0000, 10 and 20 to shard00001
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 0 }, to: 'shard0000' });
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 19 }, to: 'shard0001' });
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 21 }, to: 'shard0001' });
+
+printjson(st.config.getSiblingDB('config').chunks.find().toArray());
+
+// Insert one document in each chunk, which we will use to change
+assert(st.s1.getDB(dbTestName).TestColl.insert({ Key: 1 }));
+assert(st.s1.getDB(dbTestName).TestColl.insert({ Key: 11 }));
+assert(st.s1.getDB(dbTestName).TestColl.insert({ Key: 21 }));
+
+// Make sure the documents are correctly placed
+printjson(st.d0.getDB(dbTestName).TestColl.find().toArray());
+printjson(st.d1.getDB(dbTestName).TestColl.find().toArray());
+
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.count());
+assert.eq(2, st.d1.getDB(dbTestName).TestColl.count());
+
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.find({ Key: 1 }).count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.find({ Key: 11 }).count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.find({ Key: 21 }).count());
+
+// Move chunk [0, 19] to shard0000 and make sure the documents are correctly placed
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 19 }, to: 'shard0000' });
+
+printjson(st.config.getSiblingDB('config').chunks.find().toArray());
+printjson(st.d0.getDB(dbTestName).TestColl.find({}).toArray());
+printjson(st.d1.getDB(dbTestName).TestColl.find({}).toArray());
+
+// Now restart all mongod instances, so they don't know yet that they are sharded
+st.restartMongod(0);
+st.restartMongod(1);
+
+// Now that both mongod shards are restarted, they don't know yet that they are part of a sharded
+// cluster until they get a setShardVerion command. Mongos instance s1 has stale metadata and
+// doesn't know that chunk with key 19 has moved to shard0000 so it will send it to shard0001 at
+// first.
+//
+// Shard0001 would only send back a stale config exception if it receives a setShardVersion
+// command. The bug that this test validates is that setShardVersion is indeed being sent (for more
+// information, see SERVER-19395).
+st.s1.getDB(dbTestName).TestColl.update({ Key: 11 }, { $inc: { Counter: 1 } }, { upsert: true });
+
+printjson(st.d0.getDB(dbTestName).TestColl.find({}).toArray());
+printjson(st.d1.getDB(dbTestName).TestColl.find({}).toArray());
+
+assert.eq(2, st.d0.getDB(dbTestName).TestColl.count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.count());
+
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.find({ Key: 1 }).count());
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.find({ Key: 11 }).count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.find({ Key: 21 }).count());
+
+st.stop();
+
+})();
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp
index bfdc8dab4dc..59f02f20a1a 100644
--- a/src/mongo/client/scoped_db_conn_test.cpp
+++ b/src/mongo/client/scoped_db_conn_test.cpp
@@ -98,10 +98,6 @@ void exitCleanly(ExitCode rc) {
dbexit(rc, "");
}
-bool haveLocalShardingInfo(Client* client, const string& ns) {
- return false;
-}
-
namespace {
const string TARGET_HOST = "localhost:27017";
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index b87e4f5672c..45250818f15 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -45,6 +45,7 @@ dbtest = env.Program(
source=[
'basictests.cpp',
'chunktests.cpp',
+ 'chunk_manager_tests.cpp',
'clienttests.cpp',
'commandtests.cpp',
'config_server_fixture.cpp',
@@ -94,7 +95,6 @@ dbtest = env.Program(
'replica_set_monitor_test.cpp',
'repltests.cpp',
'rollbacktests.cpp',
- 'sharding.cpp',
'socktests.cpp',
'threadedtests.cpp',
'updatetests.cpp',
diff --git a/src/mongo/dbtests/chunk_manager_tests.cpp b/src/mongo/dbtests/chunk_manager_tests.cpp
new file mode 100644
index 00000000000..9d64c226df2
--- /dev/null
+++ b/src/mongo/dbtests/chunk_manager_tests.cpp
@@ -0,0 +1,207 @@
+/**
+ * Copyright (C) 2009 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/operation_context_impl.h"
+#include "mongo/dbtests/config_server_fixture.h"
+#include "mongo/dbtests/dbtests.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/chunk_manager.h"
+
+namespace mongo {
+
+using std::unique_ptr;
+using std::set;
+using std::string;
+using std::vector;
+
+namespace {
+
+static int rand(int max = -1) {
+ static unsigned seed = 1337;
+
+#if !defined(_WIN32)
+ int r = rand_r(&seed);
+#else
+ int r = ::rand(); // seed not used in this case
+#endif
+
+ // Modding is bad, but don't really care in this case
+ return max > 0 ? r % max : r;
+}
+
+/**
+ * Sets up a basic environment for loading chunks to/from the direct database connection. Redirects
+ * connections to the direct database for the duration of the test.
+ */
+class ChunkManagerFixture : public ConfigServerFixture {
+public:
+ void setUp() override {
+ ConfigServerFixture::setUp();
+
+ _client.dropDatabase(nsGetDB(_collName));
+ _client.insert(_collName,
+ BSON("hello"
+ << "world"));
+ _client.dropCollection(_collName);
+
+ // Add dummy shard to config DB
+ _client.insert(ShardType::ConfigNS,
+ BSON(ShardType::name()
+ << _shardId << ShardType::host()
+ << ConnectionString(HostAndPort("$hostFooBar:27017")).toString()));
+ }
+
+protected:
+ static const ShardId _shardId;
+ static const string _collName;
+
+ static const int numSplitPoints = 100;
+
+ void genRandomSplitPoints(vector<int>* splitPoints) {
+ for (int i = 0; i < numSplitPoints; i++) {
+ splitPoints->push_back(rand(numSplitPoints * 10));
+ }
+ }
+
+ void genRandomSplitKeys(const string& keyName, vector<BSONObj>* splitKeys) {
+ vector<int> splitPoints;
+ genRandomSplitPoints(&splitPoints);
+
+ for (vector<int>::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it) {
+ splitKeys->push_back(BSON(keyName << *it));
+ }
+ }
+
+ // Uses a chunk manager to create chunks
+ void createChunks(const string& keyName) {
+ vector<BSONObj> splitKeys;
+ genRandomSplitKeys(keyName, &splitKeys);
+
+ ShardKeyPattern shardKeyPattern(BSON(keyName << 1));
+ ChunkManager manager(_collName, shardKeyPattern, false);
+
+ manager.createFirstChunks(_shardId, &splitKeys, NULL);
+ }
+};
+
+const ShardId ChunkManagerFixture::_shardId{"shard0000"};
+const string ChunkManagerFixture::_collName{"foo.bar"};
+
+// Rename the fixture so that our tests have a useful name in the executable
+typedef ChunkManagerFixture ChunkManagerTests;
+
+/**
+ * Tests creating a new chunk manager with random split points. Creating chunks on multiple shards
+ * is not tested here since there are unresolved race conditions there and probably should be
+ * avoided if at all possible.
+ */
+TEST_F(ChunkManagerTests, FullTest) {
+ string keyName = "_id";
+ createChunks(keyName);
+
+ unique_ptr<DBClientCursor> cursor =
+ _client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(_collName)));
+
+ set<int> minorVersions;
+ OID epoch;
+
+ // Check that all chunks were created with version 1|x with consistent epoch and unique
+ // minor versions
+ while (cursor->more()) {
+ BSONObj chunk = cursor->next();
+
+ ChunkVersion version = ChunkVersion::fromBSON(chunk, ChunkType::DEPRECATED_lastmod());
+
+ ASSERT(version.majorVersion() == 1);
+ ASSERT(version.isEpochSet());
+
+ if (!epoch.isSet())
+ epoch = version.epoch();
+ ASSERT(version.epoch() == epoch);
+
+ ASSERT(minorVersions.find(version.minorVersion()) == minorVersions.end());
+ minorVersions.insert(version.minorVersion());
+
+ ASSERT(chunk[ChunkType::shard()].String() == _shardId);
+ }
+}
+
+/**
+ * Tests that chunks are loaded correctly from the db with no a-priori info and also that they can
+ * be reloaded on top of an old chunk manager with changes.
+ */
+TEST_F(ChunkManagerTests, Basic) {
+ string keyName = "_id";
+ createChunks(keyName);
+ int numChunks =
+ static_cast<int>(_client.count(ChunkType::ConfigNS, BSON(ChunkType::ns(_collName))));
+
+ BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned();
+
+ ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod());
+
+ // Make manager load existing chunks
+ CollectionType collType;
+ collType.setNs(NamespaceString{_collName});
+ collType.setEpoch(version.epoch());
+ collType.setUpdatedAt(jsTime());
+ collType.setKeyPattern(BSON("_id" << 1));
+ collType.setUnique(false);
+ collType.setDropped(false);
+
+ ChunkManager manager(collType);
+ manager.loadExistingRanges(nullptr);
+
+ ASSERT(manager.getVersion().epoch() == version.epoch());
+ ASSERT(manager.getVersion().minorVersion() == (numChunks - 1));
+ ASSERT(static_cast<int>(manager.getChunkMap().size()) == numChunks);
+
+ // Modify chunks collection
+ BSONObjBuilder b;
+ ChunkVersion laterVersion = ChunkVersion(2, 1, version.epoch());
+ laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod());
+
+ _client.update(ChunkType::ConfigNS, BSONObj(), BSON("$set" << b.obj()));
+
+ // Make new manager load chunk diff
+ ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique());
+ newManager.loadExistingRanges(&manager);
+
+ ASSERT(newManager.getVersion().toLong() == laterVersion.toLong());
+ ASSERT(newManager.getVersion().epoch() == laterVersion.epoch());
+ ASSERT(static_cast<int>(newManager.getChunkMap().size()) == numChunks);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/dbtests/config_server_fixture.cpp b/src/mongo/dbtests/config_server_fixture.cpp
index 0c6705c19a6..019110915a5 100644
--- a/src/mongo/dbtests/config_server_fixture.cpp
+++ b/src/mongo/dbtests/config_server_fixture.cpp
@@ -40,6 +40,7 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h"
#include "mongo/s/catalog/type_config_version.h"
+#include "mongo/s/client/shard_connection.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -56,6 +57,7 @@ string ConfigServerFixture::shardName() {
}
void ConfigServerFixture::setUp() {
+ shardConnectionPool.clear();
DBException::traceExceptions = true;
// Make all connections redirect to the direct client
@@ -75,7 +77,8 @@ void ConfigServerFixture::setUp() {
ChunkType::ConfigNS,
BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1)));
- ConnectionString connStr(uassertStatusOK(ConnectionString::parse("$dummy:10000")));
+ const ConnectionString connStr(uassertStatusOK(ConnectionString::parse("$dummy:10000")));
+
ShardingState::get(getGlobalServiceContext())->initialize(connStr.toString());
ShardingState::get(getGlobalServiceContext())->gotShardName(shardName());
}
diff --git a/src/mongo/dbtests/config_server_fixture.h b/src/mongo/dbtests/config_server_fixture.h
index d06ac0c4026..baddd1ea903 100644
--- a/src/mongo/dbtests/config_server_fixture.h
+++ b/src/mongo/dbtests/config_server_fixture.h
@@ -118,8 +118,8 @@ protected:
CustomDirectClient _client;
CustomConnectHook* _connectHook;
-private:
virtual void setUp();
virtual void tearDown();
};
-}
+
+} // namespace mongo
diff --git a/src/mongo/dbtests/config_upgrade_tests.cpp b/src/mongo/dbtests/config_upgrade_tests.cpp
index f0961f21fc3..7e1eb958c46 100644
--- a/src/mongo/dbtests/config_upgrade_tests.cpp
+++ b/src/mongo/dbtests/config_upgrade_tests.cpp
@@ -44,13 +44,13 @@ namespace mongo {
using std::string;
+namespace {
+
/**
* Specialization of the config server fixture with helpers for the tests below.
*/
class ConfigUpgradeFixture : public ConfigServerFixture {
public:
- ConfigUpgradeFixture() : ConfigServerFixture() {}
-
void stopBalancer() {
// Note: The balancer key is needed in the update portion, for some reason related to
// DBDirectClient
@@ -265,4 +265,5 @@ TEST_F(ConfigUpgradeTests, CheckMongoVersion) {
ASSERT(status.code() == ErrorCodes::RemoteValidationError);
}
-} // end namespace
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/dbtests/merge_chunk_tests.cpp b/src/mongo/dbtests/merge_chunk_tests.cpp
index c80692f0271..377f57ad695 100644
--- a/src/mongo/dbtests/merge_chunk_tests.cpp
+++ b/src/mongo/dbtests/merge_chunk_tests.cpp
@@ -45,13 +45,13 @@ namespace mongo {
using std::string;
using std::vector;
+namespace {
+
/**
* Specialization of the config server fixture with helpers for the tests below.
*/
class MergeChunkFixture : public ConfigServerFixture {
public:
- MergeChunkFixture() : ConfigServerFixture() {}
-
/**
* Stores ranges for a particular collection and shard starting from some version
*/
@@ -340,4 +340,5 @@ TEST_F(MergeChunkTests, CompoundMerge) {
assertWrittenAsMerged(ranges);
}
+} // namespace
} // namespace mongo
diff --git a/src/mongo/dbtests/sharding.cpp b/src/mongo/dbtests/sharding.cpp
deleted file mode 100644
index 3acaa0c4dba..00000000000
--- a/src/mongo/dbtests/sharding.cpp
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Copyright (C) 2009 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/operation_context_impl.h"
-#include "mongo/dbtests/config_server_fixture.h"
-#include "mongo/dbtests/dbtests.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/client/shard_connection.h"
-#include "mongo/s/chunk_manager.h"
-
-namespace {
-
-using std::shared_ptr;
-using std::unique_ptr;
-using std::make_pair;
-using std::map;
-using std::pair;
-using std::set;
-using std::string;
-using std::vector;
-
-static int rand(int max = -1) {
- static unsigned seed = 1337;
-
-#if !defined(_WIN32)
- int r = rand_r(&seed);
-#else
- int r = ::rand(); // seed not used in this case
-#endif
-
- // Modding is bad, but don't really care in this case
- return max > 0 ? r % max : r;
-}
-
-//
-// Sets up a basic environment for loading chunks to/from the direct database connection
-// Redirects connections to the direct database for the duration of the test.
-//
-class ChunkManagerTest : public ConnectionString::ConnectionHook {
-public:
- ChunkManagerTest() : _client(&_txn) {
- shardConnectionPool.clear();
-
- DBException::traceExceptions = true;
-
- // Make all connections redirect to the direct client
- ConnectionString::setConnectionHook(this);
-
- // Create the default config database before querying, necessary for direct connections
- _client.dropDatabase("config");
- _client.insert("config.test",
- BSON("hello"
- << "world"));
- _client.dropCollection("config.test");
-
- _client.dropDatabase(nsGetDB(collName()));
- _client.insert(collName(),
- BSON("hello"
- << "world"));
- _client.dropCollection(collName());
-
- _shardId = "shard0000";
-
- // Add dummy shard to config DB
- _client.insert(ShardType::ConfigNS,
- BSON(ShardType::name()
- << _shardId << ShardType::host()
- << ConnectionString(HostAndPort("$hostFooBar:27017")).toString()));
-
- // Create an index so that diffing works correctly, otherwise no cursors from S&O
- ASSERT_OK(dbtests::createIndex(&_txn,
- ChunkType::ConfigNS,
- BSON(ChunkType::ns() << 1 << // br
- ChunkType::DEPRECATED_lastmod() << 1)));
- }
-
- virtual ~ChunkManagerTest() {
- // Reset the redirection
- ConnectionString::setConnectionHook(NULL);
- }
-
- string collName() {
- return "foo.bar";
- }
-
- virtual DBClientBase* connect(const ConnectionString& connStr,
- string& errmsg,
- double socketTimeout) {
- // Note - must be new, since it gets owned elsewhere
- return new CustomDirectClient(&_txn);
- }
-
-
-protected:
- OperationContextImpl _txn;
- CustomDirectClient _client;
- ShardId _shardId;
-};
-
-//
-// Tests creating a new chunk manager with random split points. Creating chunks on multiple shards
-// is not tested here since there are unresolved race conditions there and probably should be
-// avoided if at all possible.
-//
-class ChunkManagerCreateFullTest : public ChunkManagerTest {
-public:
- static const int numSplitPoints = 100;
-
- void genRandomSplitPoints(vector<int>* splitPoints) {
- for (int i = 0; i < numSplitPoints; i++) {
- splitPoints->push_back(rand(numSplitPoints * 10));
- }
- }
-
- void genRandomSplitKeys(const string& keyName, vector<BSONObj>* splitKeys) {
- vector<int> splitPoints;
- genRandomSplitPoints(&splitPoints);
-
- for (vector<int>::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it) {
- splitKeys->push_back(BSON(keyName << *it));
- }
- }
-
- // Uses a chunk manager to create chunks
- void createChunks(const string& keyName) {
- vector<BSONObj> splitKeys;
- genRandomSplitKeys(keyName, &splitKeys);
-
- ShardKeyPattern shardKeyPattern(BSON(keyName << 1));
- ChunkManager manager(collName(), shardKeyPattern, false);
-
- manager.createFirstChunks(_shardId, &splitKeys, NULL);
- }
-
- void run() {
- string keyName = "_id";
- createChunks(keyName);
-
- unique_ptr<DBClientCursor> cursor =
- _client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(collName())));
-
- set<int> minorVersions;
- OID epoch;
-
- // Check that all chunks were created with version 1|x with consistent epoch and unique
- // minor versions
- while (cursor->more()) {
- BSONObj chunk = cursor->next();
-
- ChunkVersion version = ChunkVersion::fromBSON(chunk, ChunkType::DEPRECATED_lastmod());
-
- ASSERT(version.majorVersion() == 1);
- ASSERT(version.isEpochSet());
-
- if (!epoch.isSet())
- epoch = version.epoch();
- ASSERT(version.epoch() == epoch);
-
- ASSERT(minorVersions.find(version.minorVersion()) == minorVersions.end());
- minorVersions.insert(version.minorVersion());
-
- ASSERT(chunk[ChunkType::shard()].String() == _shardId);
- }
- }
-};
-
-//
-// Tests that chunks are loaded correctly from the db with no a-priori info and also that they can
-// be reloaded on top of an old chunk manager with changes.
-//
-class ChunkManagerLoadBasicTest : public ChunkManagerCreateFullTest {
-public:
- void run() {
- string keyName = "_id";
- createChunks(keyName);
- int numChunks =
- static_cast<int>(_client.count(ChunkType::ConfigNS, BSON(ChunkType::ns(collName()))));
-
- BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned();
-
- ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod());
-
- // Make manager load existing chunks
- CollectionType collType;
- collType.setNs(NamespaceString{collName()});
- collType.setEpoch(version.epoch());
- collType.setUpdatedAt(jsTime());
- collType.setKeyPattern(BSON("_id" << 1));
- collType.setUnique(false);
- collType.setDropped(false);
-
- ChunkManager manager(collType);
- manager.loadExistingRanges(nullptr);
-
- ASSERT(manager.getVersion().epoch() == version.epoch());
- ASSERT(manager.getVersion().minorVersion() == (numChunks - 1));
- ASSERT(static_cast<int>(manager.getChunkMap().size()) == numChunks);
-
- // Modify chunks collection
- BSONObjBuilder b;
- ChunkVersion laterVersion = ChunkVersion(2, 1, version.epoch());
- laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod());
-
- _client.update(ChunkType::ConfigNS, BSONObj(), BSON("$set" << b.obj()));
-
- // Make new manager load chunk diff
- ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique());
- newManager.loadExistingRanges(&manager);
-
- ASSERT(newManager.getVersion().toLong() == laterVersion.toLong());
- ASSERT(newManager.getVersion().epoch() == laterVersion.epoch());
- ASSERT(static_cast<int>(newManager.getChunkMap().size()) == numChunks);
- }
-};
-
-class All : public Suite {
-public:
- All() : Suite("sharding") {}
-
- void setupTests() {
- add<ChunkManagerCreateFullTest>();
- add<ChunkManagerLoadBasicTest>();
- }
-};
-
-SuiteInstance<All> myall;
-
-} // namespace
diff --git a/src/mongo/s/client/dbclient_multi_command.cpp b/src/mongo/s/client/dbclient_multi_command.cpp
index ca4d79de8b1..3fc3dfabee8 100644
--- a/src/mongo/s/client/dbclient_multi_command.cpp
+++ b/src/mongo/s/client/dbclient_multi_command.cpp
@@ -30,7 +30,6 @@
#include "mongo/s/client/dbclient_multi_command.h"
-
#include "mongo/db/audit.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/wire_version.h"
@@ -38,6 +37,7 @@
#include "mongo/rpc/request_builder_interface.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/net/message.h"
namespace mongo {
@@ -46,22 +46,6 @@ using std::unique_ptr;
using std::deque;
using std::string;
-DBClientMultiCommand::PendingCommand::PendingCommand(const ConnectionString& endpoint,
- StringData dbName,
- const BSONObj& cmdObj)
- : endpoint(endpoint),
- dbName(dbName.toString()),
- cmdObj(cmdObj),
- conn(NULL),
- status(Status::OK()) {}
-
-void DBClientMultiCommand::addCommand(const ConnectionString& endpoint,
- StringData dbName,
- const BSONObj& request) {
- PendingCommand* command = new PendingCommand(endpoint, dbName, request);
- _pendingCommands.push_back(command);
-}
-
namespace {
//
@@ -133,44 +117,52 @@ static void recvAsCmd(DBClientBase* conn, Message* toRecv, BSONObj* result) {
*result = reply->getCommandReply();
}
+DBClientMultiCommand::DBClientMultiCommand() = default;
+
+DBClientMultiCommand::~DBClientMultiCommand() {
+ // Cleanup anything outstanding, do *not* return stuff to the pool, that might error
+ for (deque<PendingCommand*>::iterator it = _pendingCommands.begin();
+ it != _pendingCommands.end();
+ ++it) {
+ PendingCommand* command = *it;
+ delete command;
+ }
+
+ _pendingCommands.clear();
+}
+
+void DBClientMultiCommand::addCommand(const ConnectionString& endpoint,
+ StringData dbName,
+ const BSONObj& request) {
+ PendingCommand* command = new PendingCommand(endpoint, dbName, request);
+ _pendingCommands.push_back(command);
+}
+
void DBClientMultiCommand::sendAll() {
for (deque<PendingCommand*>::iterator it = _pendingCommands.begin();
it != _pendingCommands.end();
++it) {
PendingCommand* command = *it;
- dassert(NULL == command->conn);
+ dassert(!command->conn);
try {
dassert(command->endpoint.type() == ConnectionString::MASTER ||
command->endpoint.type() == ConnectionString::CUSTOM);
- // TODO: Fix the pool up to take millis directly
- int timeoutSecs = _timeoutMillis / 1000;
- command->conn = shardConnectionPool.get(command->endpoint, timeoutSecs);
+ command->conn = stdx::make_unique<ShardConnection>(command->endpoint, "");
// Sanity check if we're sending a batch write that we're talking to a new-enough
// server.
massert(28563,
str::stream() << "cannot send batch write operation to server "
- << command->conn->toString(),
- !isBatchWriteCommand(command->cmdObj) || hasBatchWriteFeature(command->conn));
+ << command->conn->get()->toString(),
+ !isBatchWriteCommand(command->cmdObj) ||
+ hasBatchWriteFeature(command->conn->get()));
- sayAsCmd(command->conn, command->dbName, command->cmdObj);
+ sayAsCmd(command->conn->get(), command->dbName, command->cmdObj);
} catch (const DBException& ex) {
command->status = ex.toStatus();
-
- if (NULL != command->conn) {
- // Confusingly, the pool needs to know about failed connections so that it can
- // invalidate other connections which might be bad. But if the connection
- // doesn't seem bad, don't send it back, because we don't want to reuse it.
- if (!command->conn->isFailed()) {
- delete command->conn;
- } else {
- shardConnectionPool.release(command->endpoint.toString(), command->conn);
- }
-
- command->conn = NULL;
- }
+ command->conn.reset();
}
}
}
@@ -187,56 +179,34 @@ Status DBClientMultiCommand::recvAny(ConnectionString* endpoint, BSONSerializabl
if (!command->status.isOK())
return command->status;
- dassert(NULL != command->conn);
+ dassert(command->conn);
try {
// Holds the data and BSONObj for the command result
Message toRecv;
BSONObj result;
- recvAsCmd(command->conn, &toRecv, &result);
-
- shardConnectionPool.release(command->endpoint.toString(), command->conn);
- command->conn = NULL;
+ recvAsCmd(command->conn->get(), &toRecv, &result);
+ command->conn->done();
+ command->conn.reset();
string errMsg;
if (!response->parseBSON(result, &errMsg) || !response->isValid(&errMsg)) {
return Status(ErrorCodes::FailedToParse, errMsg);
}
} catch (const DBException& ex) {
- // Confusingly, the pool needs to know about failed connections so that it can
- // invalidate other connections which might be bad. But if the connection doesn't seem
- // bad, don't send it back, because we don't want to reuse it.
- if (!command->conn->isFailed()) {
- delete command->conn;
- } else {
- shardConnectionPool.release(command->endpoint.toString(), command->conn);
- }
- command->conn = NULL;
-
+ command->conn.reset();
return ex.toStatus();
}
return Status::OK();
}
-DBClientMultiCommand::~DBClientMultiCommand() {
- // Cleanup anything outstanding, do *not* return stuff to the pool, that might error
- for (deque<PendingCommand*>::iterator it = _pendingCommands.begin();
- it != _pendingCommands.end();
- ++it) {
- PendingCommand* command = *it;
-
- if (NULL != command->conn)
- delete command->conn;
- delete command;
- command = NULL;
- }
+DBClientMultiCommand::PendingCommand::PendingCommand(const ConnectionString& endpoint,
+ StringData dbName,
+ const BSONObj& cmdObj)
+ : endpoint(endpoint), dbName(dbName.toString()), cmdObj(cmdObj), status(Status::OK()) {}
- _pendingCommands.clear();
-}
+DBClientMultiCommand::PendingCommand::~PendingCommand() = default;
-void DBClientMultiCommand::setTimeoutMillis(int milliSecs) {
- _timeoutMillis = milliSecs;
-}
-}
+} // namespace mongo
diff --git a/src/mongo/s/client/dbclient_multi_command.h b/src/mongo/s/client/dbclient_multi_command.h
index ab0f477d885..84f10a061e9 100644
--- a/src/mongo/s/client/dbclient_multi_command.h
+++ b/src/mongo/s/client/dbclient_multi_command.h
@@ -35,6 +35,8 @@
namespace mongo {
+class ShardConnection;
+
/**
* A DBClientMultiCommand uses the client driver (DBClientConnections) to send and recv
* commands to different hosts in parallel.
@@ -43,8 +45,7 @@ namespace mongo {
*/
class DBClientMultiCommand : public MultiCommandDispatch {
public:
- DBClientMultiCommand() : _timeoutMillis(0) {}
-
+ DBClientMultiCommand();
~DBClientMultiCommand();
void addCommand(const ConnectionString& endpoint,
@@ -57,12 +58,11 @@ public:
Status recvAny(ConnectionString* endpoint, BSONSerializable* response) override;
- void setTimeoutMillis(int milliSecs);
-
private:
// All info associated with an pre- or in-flight command
struct PendingCommand {
PendingCommand(const ConnectionString& endpoint, StringData dbName, const BSONObj& cmdObj);
+ ~PendingCommand();
// What to send
const ConnectionString endpoint;
@@ -70,7 +70,7 @@ private:
const BSONObj cmdObj;
// Where to send it
- DBClientBase* conn;
+ std::unique_ptr<ShardConnection> conn;
// If anything goes wrong
Status status;
@@ -78,6 +78,6 @@ private:
typedef std::deque<PendingCommand*> PendingQueue;
PendingQueue _pendingCommands;
- int _timeoutMillis;
};
-}
+
+} // namespace mongo
diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp
index 81d64dfb3b8..c6d765afb5a 100644
--- a/src/mongo/s/client/shard_connection.cpp
+++ b/src/mongo/s/client/shard_connection.cpp
@@ -409,12 +409,19 @@ DBConnectionPool shardConnectionPool;
// Different between mongos and mongod
void usingAShardConnection(const string& addr);
-
ShardConnection::ShardConnection(const ConnectionString& connectionString,
const string& ns,
std::shared_ptr<ChunkManager> manager)
- : _cs(connectionString), _ns(ns), _manager(manager) {
- _init();
+ : _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) {
+ invariant(_cs.isValid());
+
+ // Make sure we specified a manager for the correct namespace
+ if (_ns.size() && _manager) {
+ invariant(_manager->getns() == _ns);
+ }
+
+ _conn = ClientConnections::threadInstance()->get(_cs.toString(), _ns);
+ usingAShardConnection(_cs.toString());
}
ShardConnection::~ShardConnection() {
@@ -437,22 +444,12 @@ ShardConnection::~ShardConnection() {
}
}
-void ShardConnection::_init() {
- invariant(_cs.isValid());
- _conn = ClientConnections::threadInstance()->get(_cs.toString(), _ns);
- _finishedInit = false;
- usingAShardConnection(_cs.toString());
-}
-
void ShardConnection::_finishInit() {
if (_finishedInit)
return;
_finishedInit = true;
if (versionManager.isVersionableCB(_conn)) {
- // Make sure we specified a manager for the correct namespace
- if (_ns.size() && _manager)
- verify(_manager->getns() == _ns);
_setVersion = versionManager.checkShardVersionCB(this, false, 1);
} else {
// Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h
index c6731945ec1..8882961bb1c 100644
--- a/src/mongo/s/client/shard_connection.h
+++ b/src/mongo/s/client/shard_connection.h
@@ -123,13 +123,11 @@ public:
static void forgetNS(const std::string& ns);
private:
- void _init();
void _finishInit();
const ConnectionString _cs;
const std::string _ns;
-
- std::shared_ptr<ChunkManager> _manager;
+ const std::shared_ptr<ChunkManager> _manager;
bool _finishedInit;
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 156f736413a..17b2abfd7bc 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -33,7 +33,6 @@
#include "mongo/platform/basic.h"
#include "mongo/client/connpool.h"
-#include "mongo/client/parallel.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_manager.h"
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 79d27a87e0f..4a1653e4fcb 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -108,11 +108,6 @@ bool inShutdown() {
return dbexitCalled;
}
-bool haveLocalShardingInfo(Client* client, const string& ns) {
- verify(0);
- return false;
-}
-
static BSONObj buildErrReply(const DBException& ex) {
BSONObjBuilder errB;
errB.append("$err", ex.what());
@@ -213,11 +208,12 @@ static Status initializeSharding(bool doUpgrade) {
if (!status.isOK()) {
return status;
}
+
return Status::OK();
}
static ExitCode runMongosServer(bool doUpgrade) {
- setThreadName("mongosMain");
+ Client::initThread("mongosMain");
printShardingVersionInfo(false);
// Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks
diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp
index d3cb5ddc875..8faed905b60 100644
--- a/src/mongo/s/version_manager.cpp
+++ b/src/mongo/s/version_manager.cpp
@@ -196,9 +196,11 @@ bool initShardVersionEmptyNS(DBClientBase* conn_in) {
conn = getVersionable(conn_in);
dassert(conn); // errors thrown above
- // Check to see if we've already initialized this connection
- if (connectionShardStatus.hasAnySequenceSet(conn))
+ // Check to see if we've already initialized this connection. This avoids sending
+ // setShardVersion multiple times.
+ if (connectionShardStatus.hasAnySequenceSet(conn)) {
return false;
+ }
// Check to see if this is actually a shard and not a single config server
// NOTE: Config servers are registered only by the name "config" in the shard cache, not
diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js
index a24949f58c3..79b7049fc9a 100644
--- a/src/mongo/shell/shardingtest.js
+++ b/src/mongo/shell/shardingtest.js
@@ -1137,9 +1137,16 @@ ShardingTest.prototype.stopMongos = function(n) {
};
/**
- * Restarts a previously stopped mongos using the same parameter as before.
+ * Kills the mongod with index n.
+ */
+ShardingTest.prototype.stopMongod = function(n) {
+ MongoRunner.stopMongod(this['d' + n].port);
+};
+
+/**
+ * Restarts a previously stopped mongos using the same parameters as before.
*
- * Warning: Overwrites the old s (if n = 0) and sn member variables
+ * Warning: Overwrites the old s (if n = 0) and sn member variables.
*/
ShardingTest.prototype.restartMongos = function(n) {
this.stopMongos(n);
@@ -1152,6 +1159,22 @@ ShardingTest.prototype.restartMongos = function(n) {
};
/**
+ * Restarts a previously stopped mongod using the same parameters as before.
+ *
+ * Warning: Overwrites the old dn member variables.
+ */
+ShardingTest.prototype.restartMongod = function(n) {
+ this.stopMongod(n);
+
+ var cmdLine = this['d' + n].commandLine;
+ cmdLine['restart'] = true;
+
+ var newConn = MongoRunner.runMongod(cmdLine);
+
+ this['d' + n] = newConn;
+};
+
+/**
* Helper method for setting primary shard of a database and making sure that it was successful.
* Note: first mongos needs to be up.
*/