diff options
-rw-r--r-- | jstests/sharding/write_commands_sharding_state.js | 79 | ||||
-rw-r--r-- | src/mongo/client/scoped_db_conn_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/dbtests/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/dbtests/chunk_manager_tests.cpp | 207 | ||||
-rw-r--r-- | src/mongo/dbtests/config_server_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/dbtests/config_server_fixture.h | 4 | ||||
-rw-r--r-- | src/mongo/dbtests/config_upgrade_tests.cpp | 7 | ||||
-rw-r--r-- | src/mongo/dbtests/merge_chunk_tests.cpp | 5 | ||||
-rw-r--r-- | src/mongo/dbtests/sharding.cpp | 259 | ||||
-rw-r--r-- | src/mongo/s/client/dbclient_multi_command.cpp | 110 | ||||
-rw-r--r-- | src/mongo/s/client/dbclient_multi_command.h | 14 | ||||
-rw-r--r-- | src/mongo/s/client/shard_connection.cpp | 23 | ||||
-rw-r--r-- | src/mongo/s/client/shard_connection.h | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/commands_public.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/version_manager.cpp | 6 | ||||
-rw-r--r-- | src/mongo/shell/shardingtest.js | 27 |
17 files changed, 389 insertions, 376 deletions
diff --git a/jstests/sharding/write_commands_sharding_state.js b/jstests/sharding/write_commands_sharding_state.js new file mode 100644 index 00000000000..99fbe92ee42 --- /dev/null +++ b/jstests/sharding/write_commands_sharding_state.js @@ -0,0 +1,79 @@ +(function() {
+
+'use strict';
+
+var st = new ShardingTest({name: "write_commands", mongos: 2, shards: 2 });
+st.stopBalancer();
+
+var dbTestName = 'WriteCommandsTestDB';
+
+assert.commandWorked(st.s0.adminCommand({ enablesharding: dbTestName }));
+st.ensurePrimaryShard(dbTestName, 'shard0000');
+
+assert.commandWorked(st.s0.adminCommand({ shardCollection: dbTestName + '.TestColl',
+ key: { Key: 1 },
+ unique: true }));
+
+// Split at keys 10 and 20
+assert.commandWorked(st.s0.adminCommand({ split: dbTestName + '.TestColl', middle: { Key: 10 } }));
+assert.commandWorked(st.s0.adminCommand({ split: dbTestName + '.TestColl', middle: { Key: 20 } }));
+
+printjson(st.config.getSiblingDB('config').chunks.find().toArray());
+
+// Move < 10 to shard0000, 10 and 20 to shard00001
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 0 }, to: 'shard0000' });
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 19 }, to: 'shard0001' });
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 21 }, to: 'shard0001' });
+
+printjson(st.config.getSiblingDB('config').chunks.find().toArray());
+
+// Insert one document in each chunk, which we will use to change
+assert(st.s1.getDB(dbTestName).TestColl.insert({ Key: 1 }));
+assert(st.s1.getDB(dbTestName).TestColl.insert({ Key: 11 }));
+assert(st.s1.getDB(dbTestName).TestColl.insert({ Key: 21 }));
+
+// Make sure the documents are correctly placed
+printjson(st.d0.getDB(dbTestName).TestColl.find().toArray());
+printjson(st.d1.getDB(dbTestName).TestColl.find().toArray());
+
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.count());
+assert.eq(2, st.d1.getDB(dbTestName).TestColl.count());
+
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.find({ Key: 1 }).count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.find({ Key: 11 }).count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.find({ Key: 21 }).count());
+
+// Move chunk [0, 19] to shard0000 and make sure the documents are correctly placed
+st.s0.adminCommand({ moveChunk: dbTestName + '.TestColl', find: { Key: 19 }, to: 'shard0000' });
+
+printjson(st.config.getSiblingDB('config').chunks.find().toArray());
+printjson(st.d0.getDB(dbTestName).TestColl.find({}).toArray());
+printjson(st.d1.getDB(dbTestName).TestColl.find({}).toArray());
+
+// Now restart all mongod instances, so they don't know yet that they are sharded
+st.restartMongod(0);
+st.restartMongod(1);
+
+// Now that both mongod shards are restarted, they don't know yet that they are part of a sharded
+// cluster until they get a setShardVerion command. Mongos instance s1 has stale metadata and
+// doesn't know that chunk with key 19 has moved to shard0000 so it will send it to shard0001 at
+// first.
+//
+// Shard0001 would only send back a stale config exception if it receives a setShardVersion
+// command. The bug that this test validates is that setShardVersion is indeed being sent (for more
+// information, see SERVER-19395).
+st.s1.getDB(dbTestName).TestColl.update({ Key: 11 }, { $inc: { Counter: 1 } }, { upsert: true });
+
+printjson(st.d0.getDB(dbTestName).TestColl.find({}).toArray());
+printjson(st.d1.getDB(dbTestName).TestColl.find({}).toArray());
+
+assert.eq(2, st.d0.getDB(dbTestName).TestColl.count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.count());
+
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.find({ Key: 1 }).count());
+assert.eq(1, st.d0.getDB(dbTestName).TestColl.find({ Key: 11 }).count());
+assert.eq(1, st.d1.getDB(dbTestName).TestColl.find({ Key: 21 }).count());
+
+st.stop();
+
+})();
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index bfdc8dab4dc..59f02f20a1a 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -98,10 +98,6 @@ void exitCleanly(ExitCode rc) { dbexit(rc, ""); } -bool haveLocalShardingInfo(Client* client, const string& ns) { - return false; -} - namespace { const string TARGET_HOST = "localhost:27017"; diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index b87e4f5672c..45250818f15 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -45,6 +45,7 @@ dbtest = env.Program( source=[ 'basictests.cpp', 'chunktests.cpp', + 'chunk_manager_tests.cpp', 'clienttests.cpp', 'commandtests.cpp', 'config_server_fixture.cpp', @@ -94,7 +95,6 @@ dbtest = env.Program( 'replica_set_monitor_test.cpp', 'repltests.cpp', 'rollbacktests.cpp', - 'sharding.cpp', 'socktests.cpp', 'threadedtests.cpp', 'updatetests.cpp', diff --git a/src/mongo/dbtests/chunk_manager_tests.cpp b/src/mongo/dbtests/chunk_manager_tests.cpp new file mode 100644 index 00000000000..9d64c226df2 --- /dev/null +++ b/src/mongo/dbtests/chunk_manager_tests.cpp @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2009 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/db/operation_context_impl.h" +#include "mongo/dbtests/config_server_fixture.h" +#include "mongo/dbtests/dbtests.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/chunk_manager.h" + +namespace mongo { + +using std::unique_ptr; +using std::set; +using std::string; +using std::vector; + +namespace { + +static int rand(int max = -1) { + static unsigned seed = 1337; + +#if !defined(_WIN32) + int r = rand_r(&seed); +#else + int r = ::rand(); // seed not used in this case +#endif + + // Modding is bad, but don't really care in this case + return max > 0 ? r % max : r; +} + +/** + * Sets up a basic environment for loading chunks to/from the direct database connection. Redirects + * connections to the direct database for the duration of the test. + */ +class ChunkManagerFixture : public ConfigServerFixture { +public: + void setUp() override { + ConfigServerFixture::setUp(); + + _client.dropDatabase(nsGetDB(_collName)); + _client.insert(_collName, + BSON("hello" + << "world")); + _client.dropCollection(_collName); + + // Add dummy shard to config DB + _client.insert(ShardType::ConfigNS, + BSON(ShardType::name() + << _shardId << ShardType::host() + << ConnectionString(HostAndPort("$hostFooBar:27017")).toString())); + } + +protected: + static const ShardId _shardId; + static const string _collName; + + static const int numSplitPoints = 100; + + void genRandomSplitPoints(vector<int>* splitPoints) { + for (int i = 0; i < numSplitPoints; i++) { + splitPoints->push_back(rand(numSplitPoints * 10)); + } + } + + void genRandomSplitKeys(const string& keyName, vector<BSONObj>* splitKeys) { + vector<int> splitPoints; + genRandomSplitPoints(&splitPoints); + + for (vector<int>::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it) { + splitKeys->push_back(BSON(keyName << *it)); + } + } + + // Uses a chunk manager to create chunks + void createChunks(const string& keyName) { + vector<BSONObj> splitKeys; + genRandomSplitKeys(keyName, &splitKeys); + + ShardKeyPattern shardKeyPattern(BSON(keyName << 1)); + ChunkManager manager(_collName, shardKeyPattern, false); + + manager.createFirstChunks(_shardId, &splitKeys, NULL); + } +}; + +const ShardId ChunkManagerFixture::_shardId{"shard0000"}; +const string ChunkManagerFixture::_collName{"foo.bar"}; + +// Rename the fixture so that our tests have a useful name in the executable +typedef ChunkManagerFixture ChunkManagerTests; + +/** + * Tests creating a new chunk manager with random split points. Creating chunks on multiple shards + * is not tested here since there are unresolved race conditions there and probably should be + * avoided if at all possible. + */ +TEST_F(ChunkManagerTests, FullTest) { + string keyName = "_id"; + createChunks(keyName); + + unique_ptr<DBClientCursor> cursor = + _client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(_collName))); + + set<int> minorVersions; + OID epoch; + + // Check that all chunks were created with version 1|x with consistent epoch and unique + // minor versions + while (cursor->more()) { + BSONObj chunk = cursor->next(); + + ChunkVersion version = ChunkVersion::fromBSON(chunk, ChunkType::DEPRECATED_lastmod()); + + ASSERT(version.majorVersion() == 1); + ASSERT(version.isEpochSet()); + + if (!epoch.isSet()) + epoch = version.epoch(); + ASSERT(version.epoch() == epoch); + + ASSERT(minorVersions.find(version.minorVersion()) == minorVersions.end()); + minorVersions.insert(version.minorVersion()); + + ASSERT(chunk[ChunkType::shard()].String() == _shardId); + } +} + +/** + * Tests that chunks are loaded correctly from the db with no a-priori info and also that they can + * be reloaded on top of an old chunk manager with changes. + */ +TEST_F(ChunkManagerTests, Basic) { + string keyName = "_id"; + createChunks(keyName); + int numChunks = + static_cast<int>(_client.count(ChunkType::ConfigNS, BSON(ChunkType::ns(_collName)))); + + BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned(); + + ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod()); + + // Make manager load existing chunks + CollectionType collType; + collType.setNs(NamespaceString{_collName}); + collType.setEpoch(version.epoch()); + collType.setUpdatedAt(jsTime()); + collType.setKeyPattern(BSON("_id" << 1)); + collType.setUnique(false); + collType.setDropped(false); + + ChunkManager manager(collType); + manager.loadExistingRanges(nullptr); + + ASSERT(manager.getVersion().epoch() == version.epoch()); + ASSERT(manager.getVersion().minorVersion() == (numChunks - 1)); + ASSERT(static_cast<int>(manager.getChunkMap().size()) == numChunks); + + // Modify chunks collection + BSONObjBuilder b; + ChunkVersion laterVersion = ChunkVersion(2, 1, version.epoch()); + laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); + + _client.update(ChunkType::ConfigNS, BSONObj(), BSON("$set" << b.obj())); + + // Make new manager load chunk diff + ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique()); + newManager.loadExistingRanges(&manager); + + ASSERT(newManager.getVersion().toLong() == laterVersion.toLong()); + ASSERT(newManager.getVersion().epoch() == laterVersion.epoch()); + ASSERT(static_cast<int>(newManager.getChunkMap().size()) == numChunks); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/dbtests/config_server_fixture.cpp b/src/mongo/dbtests/config_server_fixture.cpp index 0c6705c19a6..019110915a5 100644 --- a/src/mongo/dbtests/config_server_fixture.cpp +++ b/src/mongo/dbtests/config_server_fixture.cpp @@ -40,6 +40,7 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h" #include "mongo/s/catalog/type_config_version.h" +#include "mongo/s/client/shard_connection.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -56,6 +57,7 @@ string ConfigServerFixture::shardName() { } void ConfigServerFixture::setUp() { + shardConnectionPool.clear(); DBException::traceExceptions = true; // Make all connections redirect to the direct client @@ -75,7 +77,8 @@ void ConfigServerFixture::setUp() { ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1))); - ConnectionString connStr(uassertStatusOK(ConnectionString::parse("$dummy:10000"))); + const ConnectionString connStr(uassertStatusOK(ConnectionString::parse("$dummy:10000"))); + ShardingState::get(getGlobalServiceContext())->initialize(connStr.toString()); ShardingState::get(getGlobalServiceContext())->gotShardName(shardName()); } diff --git a/src/mongo/dbtests/config_server_fixture.h b/src/mongo/dbtests/config_server_fixture.h index d06ac0c4026..baddd1ea903 100644 --- a/src/mongo/dbtests/config_server_fixture.h +++ b/src/mongo/dbtests/config_server_fixture.h @@ -118,8 +118,8 @@ protected: CustomDirectClient _client; CustomConnectHook* _connectHook; -private: virtual void setUp(); virtual void tearDown(); }; -} + +} // namespace mongo diff --git a/src/mongo/dbtests/config_upgrade_tests.cpp b/src/mongo/dbtests/config_upgrade_tests.cpp index f0961f21fc3..7e1eb958c46 100644 --- a/src/mongo/dbtests/config_upgrade_tests.cpp +++ b/src/mongo/dbtests/config_upgrade_tests.cpp @@ -44,13 +44,13 @@ namespace mongo { using std::string; +namespace { + /** * Specialization of the config server fixture with helpers for the tests below. */ class ConfigUpgradeFixture : public ConfigServerFixture { public: - ConfigUpgradeFixture() : ConfigServerFixture() {} - void stopBalancer() { // Note: The balancer key is needed in the update portion, for some reason related to // DBDirectClient @@ -265,4 +265,5 @@ TEST_F(ConfigUpgradeTests, CheckMongoVersion) { ASSERT(status.code() == ErrorCodes::RemoteValidationError); } -} // end namespace +} // namespace +} // namespace mongo diff --git a/src/mongo/dbtests/merge_chunk_tests.cpp b/src/mongo/dbtests/merge_chunk_tests.cpp index c80692f0271..377f57ad695 100644 --- a/src/mongo/dbtests/merge_chunk_tests.cpp +++ b/src/mongo/dbtests/merge_chunk_tests.cpp @@ -45,13 +45,13 @@ namespace mongo { using std::string; using std::vector; +namespace { + /** * Specialization of the config server fixture with helpers for the tests below. */ class MergeChunkFixture : public ConfigServerFixture { public: - MergeChunkFixture() : ConfigServerFixture() {} - /** * Stores ranges for a particular collection and shard starting from some version */ @@ -340,4 +340,5 @@ TEST_F(MergeChunkTests, CompoundMerge) { assertWrittenAsMerged(ranges); } +} // namespace } // namespace mongo diff --git a/src/mongo/dbtests/sharding.cpp b/src/mongo/dbtests/sharding.cpp deleted file mode 100644 index 3acaa0c4dba..00000000000 --- a/src/mongo/dbtests/sharding.cpp +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Copyright (C) 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/basic.h" - -#include "mongo/db/operation_context_impl.h" -#include "mongo/dbtests/config_server_fixture.h" -#include "mongo/dbtests/dbtests.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/client/shard_connection.h" -#include "mongo/s/chunk_manager.h" - -namespace { - -using std::shared_ptr; -using std::unique_ptr; -using std::make_pair; -using std::map; -using std::pair; -using std::set; -using std::string; -using std::vector; - -static int rand(int max = -1) { - static unsigned seed = 1337; - -#if !defined(_WIN32) - int r = rand_r(&seed); -#else - int r = ::rand(); // seed not used in this case -#endif - - // Modding is bad, but don't really care in this case - return max > 0 ? r % max : r; -} - -// -// Sets up a basic environment for loading chunks to/from the direct database connection -// Redirects connections to the direct database for the duration of the test. -// -class ChunkManagerTest : public ConnectionString::ConnectionHook { -public: - ChunkManagerTest() : _client(&_txn) { - shardConnectionPool.clear(); - - DBException::traceExceptions = true; - - // Make all connections redirect to the direct client - ConnectionString::setConnectionHook(this); - - // Create the default config database before querying, necessary for direct connections - _client.dropDatabase("config"); - _client.insert("config.test", - BSON("hello" - << "world")); - _client.dropCollection("config.test"); - - _client.dropDatabase(nsGetDB(collName())); - _client.insert(collName(), - BSON("hello" - << "world")); - _client.dropCollection(collName()); - - _shardId = "shard0000"; - - // Add dummy shard to config DB - _client.insert(ShardType::ConfigNS, - BSON(ShardType::name() - << _shardId << ShardType::host() - << ConnectionString(HostAndPort("$hostFooBar:27017")).toString())); - - // Create an index so that diffing works correctly, otherwise no cursors from S&O - ASSERT_OK(dbtests::createIndex(&_txn, - ChunkType::ConfigNS, - BSON(ChunkType::ns() << 1 << // br - ChunkType::DEPRECATED_lastmod() << 1))); - } - - virtual ~ChunkManagerTest() { - // Reset the redirection - ConnectionString::setConnectionHook(NULL); - } - - string collName() { - return "foo.bar"; - } - - virtual DBClientBase* connect(const ConnectionString& connStr, - string& errmsg, - double socketTimeout) { - // Note - must be new, since it gets owned elsewhere - return new CustomDirectClient(&_txn); - } - - -protected: - OperationContextImpl _txn; - CustomDirectClient _client; - ShardId _shardId; -}; - -// -// Tests creating a new chunk manager with random split points. Creating chunks on multiple shards -// is not tested here since there are unresolved race conditions there and probably should be -// avoided if at all possible. -// -class ChunkManagerCreateFullTest : public ChunkManagerTest { -public: - static const int numSplitPoints = 100; - - void genRandomSplitPoints(vector<int>* splitPoints) { - for (int i = 0; i < numSplitPoints; i++) { - splitPoints->push_back(rand(numSplitPoints * 10)); - } - } - - void genRandomSplitKeys(const string& keyName, vector<BSONObj>* splitKeys) { - vector<int> splitPoints; - genRandomSplitPoints(&splitPoints); - - for (vector<int>::iterator it = splitPoints.begin(); it != splitPoints.end(); ++it) { - splitKeys->push_back(BSON(keyName << *it)); - } - } - - // Uses a chunk manager to create chunks - void createChunks(const string& keyName) { - vector<BSONObj> splitKeys; - genRandomSplitKeys(keyName, &splitKeys); - - ShardKeyPattern shardKeyPattern(BSON(keyName << 1)); - ChunkManager manager(collName(), shardKeyPattern, false); - - manager.createFirstChunks(_shardId, &splitKeys, NULL); - } - - void run() { - string keyName = "_id"; - createChunks(keyName); - - unique_ptr<DBClientCursor> cursor = - _client.query(ChunkType::ConfigNS, QUERY(ChunkType::ns(collName()))); - - set<int> minorVersions; - OID epoch; - - // Check that all chunks were created with version 1|x with consistent epoch and unique - // minor versions - while (cursor->more()) { - BSONObj chunk = cursor->next(); - - ChunkVersion version = ChunkVersion::fromBSON(chunk, ChunkType::DEPRECATED_lastmod()); - - ASSERT(version.majorVersion() == 1); - ASSERT(version.isEpochSet()); - - if (!epoch.isSet()) - epoch = version.epoch(); - ASSERT(version.epoch() == epoch); - - ASSERT(minorVersions.find(version.minorVersion()) == minorVersions.end()); - minorVersions.insert(version.minorVersion()); - - ASSERT(chunk[ChunkType::shard()].String() == _shardId); - } - } -}; - -// -// Tests that chunks are loaded correctly from the db with no a-priori info and also that they can -// be reloaded on top of an old chunk manager with changes. -// -class ChunkManagerLoadBasicTest : public ChunkManagerCreateFullTest { -public: - void run() { - string keyName = "_id"; - createChunks(keyName); - int numChunks = - static_cast<int>(_client.count(ChunkType::ConfigNS, BSON(ChunkType::ns(collName())))); - - BSONObj firstChunk = _client.findOne(ChunkType::ConfigNS, BSONObj()).getOwned(); - - ChunkVersion version = ChunkVersion::fromBSON(firstChunk, ChunkType::DEPRECATED_lastmod()); - - // Make manager load existing chunks - CollectionType collType; - collType.setNs(NamespaceString{collName()}); - collType.setEpoch(version.epoch()); - collType.setUpdatedAt(jsTime()); - collType.setKeyPattern(BSON("_id" << 1)); - collType.setUnique(false); - collType.setDropped(false); - - ChunkManager manager(collType); - manager.loadExistingRanges(nullptr); - - ASSERT(manager.getVersion().epoch() == version.epoch()); - ASSERT(manager.getVersion().minorVersion() == (numChunks - 1)); - ASSERT(static_cast<int>(manager.getChunkMap().size()) == numChunks); - - // Modify chunks collection - BSONObjBuilder b; - ChunkVersion laterVersion = ChunkVersion(2, 1, version.epoch()); - laterVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); - - _client.update(ChunkType::ConfigNS, BSONObj(), BSON("$set" << b.obj())); - - // Make new manager load chunk diff - ChunkManager newManager(manager.getns(), manager.getShardKeyPattern(), manager.isUnique()); - newManager.loadExistingRanges(&manager); - - ASSERT(newManager.getVersion().toLong() == laterVersion.toLong()); - ASSERT(newManager.getVersion().epoch() == laterVersion.epoch()); - ASSERT(static_cast<int>(newManager.getChunkMap().size()) == numChunks); - } -}; - -class All : public Suite { -public: - All() : Suite("sharding") {} - - void setupTests() { - add<ChunkManagerCreateFullTest>(); - add<ChunkManagerLoadBasicTest>(); - } -}; - -SuiteInstance<All> myall; - -} // namespace diff --git a/src/mongo/s/client/dbclient_multi_command.cpp b/src/mongo/s/client/dbclient_multi_command.cpp index ca4d79de8b1..3fc3dfabee8 100644 --- a/src/mongo/s/client/dbclient_multi_command.cpp +++ b/src/mongo/s/client/dbclient_multi_command.cpp @@ -30,7 +30,6 @@ #include "mongo/s/client/dbclient_multi_command.h" - #include "mongo/db/audit.h" #include "mongo/db/dbmessage.h" #include "mongo/db/wire_version.h" @@ -38,6 +37,7 @@ #include "mongo/rpc/request_builder_interface.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/stdx/memory.h" #include "mongo/util/net/message.h" namespace mongo { @@ -46,22 +46,6 @@ using std::unique_ptr; using std::deque; using std::string; -DBClientMultiCommand::PendingCommand::PendingCommand(const ConnectionString& endpoint, - StringData dbName, - const BSONObj& cmdObj) - : endpoint(endpoint), - dbName(dbName.toString()), - cmdObj(cmdObj), - conn(NULL), - status(Status::OK()) {} - -void DBClientMultiCommand::addCommand(const ConnectionString& endpoint, - StringData dbName, - const BSONObj& request) { - PendingCommand* command = new PendingCommand(endpoint, dbName, request); - _pendingCommands.push_back(command); -} - namespace { // @@ -133,44 +117,52 @@ static void recvAsCmd(DBClientBase* conn, Message* toRecv, BSONObj* result) { *result = reply->getCommandReply(); } +DBClientMultiCommand::DBClientMultiCommand() = default; + +DBClientMultiCommand::~DBClientMultiCommand() { + // Cleanup anything outstanding, do *not* return stuff to the pool, that might error + for (deque<PendingCommand*>::iterator it = _pendingCommands.begin(); + it != _pendingCommands.end(); + ++it) { + PendingCommand* command = *it; + delete command; + } + + _pendingCommands.clear(); +} + +void DBClientMultiCommand::addCommand(const ConnectionString& endpoint, + StringData dbName, + const BSONObj& request) { + PendingCommand* command = new PendingCommand(endpoint, dbName, request); + _pendingCommands.push_back(command); +} + void DBClientMultiCommand::sendAll() { for (deque<PendingCommand*>::iterator it = _pendingCommands.begin(); it != _pendingCommands.end(); ++it) { PendingCommand* command = *it; - dassert(NULL == command->conn); + dassert(!command->conn); try { dassert(command->endpoint.type() == ConnectionString::MASTER || command->endpoint.type() == ConnectionString::CUSTOM); - // TODO: Fix the pool up to take millis directly - int timeoutSecs = _timeoutMillis / 1000; - command->conn = shardConnectionPool.get(command->endpoint, timeoutSecs); + command->conn = stdx::make_unique<ShardConnection>(command->endpoint, ""); // Sanity check if we're sending a batch write that we're talking to a new-enough // server. massert(28563, str::stream() << "cannot send batch write operation to server " - << command->conn->toString(), - !isBatchWriteCommand(command->cmdObj) || hasBatchWriteFeature(command->conn)); + << command->conn->get()->toString(), + !isBatchWriteCommand(command->cmdObj) || + hasBatchWriteFeature(command->conn->get())); - sayAsCmd(command->conn, command->dbName, command->cmdObj); + sayAsCmd(command->conn->get(), command->dbName, command->cmdObj); } catch (const DBException& ex) { command->status = ex.toStatus(); - - if (NULL != command->conn) { - // Confusingly, the pool needs to know about failed connections so that it can - // invalidate other connections which might be bad. But if the connection - // doesn't seem bad, don't send it back, because we don't want to reuse it. - if (!command->conn->isFailed()) { - delete command->conn; - } else { - shardConnectionPool.release(command->endpoint.toString(), command->conn); - } - - command->conn = NULL; - } + command->conn.reset(); } } } @@ -187,56 +179,34 @@ Status DBClientMultiCommand::recvAny(ConnectionString* endpoint, BSONSerializabl if (!command->status.isOK()) return command->status; - dassert(NULL != command->conn); + dassert(command->conn); try { // Holds the data and BSONObj for the command result Message toRecv; BSONObj result; - recvAsCmd(command->conn, &toRecv, &result); - - shardConnectionPool.release(command->endpoint.toString(), command->conn); - command->conn = NULL; + recvAsCmd(command->conn->get(), &toRecv, &result); + command->conn->done(); + command->conn.reset(); string errMsg; if (!response->parseBSON(result, &errMsg) || !response->isValid(&errMsg)) { return Status(ErrorCodes::FailedToParse, errMsg); } } catch (const DBException& ex) { - // Confusingly, the pool needs to know about failed connections so that it can - // invalidate other connections which might be bad. But if the connection doesn't seem - // bad, don't send it back, because we don't want to reuse it. - if (!command->conn->isFailed()) { - delete command->conn; - } else { - shardConnectionPool.release(command->endpoint.toString(), command->conn); - } - command->conn = NULL; - + command->conn.reset(); return ex.toStatus(); } return Status::OK(); } -DBClientMultiCommand::~DBClientMultiCommand() { - // Cleanup anything outstanding, do *not* return stuff to the pool, that might error - for (deque<PendingCommand*>::iterator it = _pendingCommands.begin(); - it != _pendingCommands.end(); - ++it) { - PendingCommand* command = *it; - - if (NULL != command->conn) - delete command->conn; - delete command; - command = NULL; - } +DBClientMultiCommand::PendingCommand::PendingCommand(const ConnectionString& endpoint, + StringData dbName, + const BSONObj& cmdObj) + : endpoint(endpoint), dbName(dbName.toString()), cmdObj(cmdObj), status(Status::OK()) {} - _pendingCommands.clear(); -} +DBClientMultiCommand::PendingCommand::~PendingCommand() = default; -void DBClientMultiCommand::setTimeoutMillis(int milliSecs) { - _timeoutMillis = milliSecs; -} -} +} // namespace mongo diff --git a/src/mongo/s/client/dbclient_multi_command.h b/src/mongo/s/client/dbclient_multi_command.h index ab0f477d885..84f10a061e9 100644 --- a/src/mongo/s/client/dbclient_multi_command.h +++ b/src/mongo/s/client/dbclient_multi_command.h @@ -35,6 +35,8 @@ namespace mongo { +class ShardConnection; + /** * A DBClientMultiCommand uses the client driver (DBClientConnections) to send and recv * commands to different hosts in parallel. @@ -43,8 +45,7 @@ namespace mongo { */ class DBClientMultiCommand : public MultiCommandDispatch { public: - DBClientMultiCommand() : _timeoutMillis(0) {} - + DBClientMultiCommand(); ~DBClientMultiCommand(); void addCommand(const ConnectionString& endpoint, @@ -57,12 +58,11 @@ public: Status recvAny(ConnectionString* endpoint, BSONSerializable* response) override; - void setTimeoutMillis(int milliSecs); - private: // All info associated with an pre- or in-flight command struct PendingCommand { PendingCommand(const ConnectionString& endpoint, StringData dbName, const BSONObj& cmdObj); + ~PendingCommand(); // What to send const ConnectionString endpoint; @@ -70,7 +70,7 @@ private: const BSONObj cmdObj; // Where to send it - DBClientBase* conn; + std::unique_ptr<ShardConnection> conn; // If anything goes wrong Status status; @@ -78,6 +78,6 @@ private: typedef std::deque<PendingCommand*> PendingQueue; PendingQueue _pendingCommands; - int _timeoutMillis; }; -} + +} // namespace mongo diff --git a/src/mongo/s/client/shard_connection.cpp b/src/mongo/s/client/shard_connection.cpp index 81d64dfb3b8..c6d765afb5a 100644 --- a/src/mongo/s/client/shard_connection.cpp +++ b/src/mongo/s/client/shard_connection.cpp @@ -409,12 +409,19 @@ DBConnectionPool shardConnectionPool; // Different between mongos and mongod void usingAShardConnection(const string& addr); - ShardConnection::ShardConnection(const ConnectionString& connectionString, const string& ns, std::shared_ptr<ChunkManager> manager) - : _cs(connectionString), _ns(ns), _manager(manager) { - _init(); + : _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) { + invariant(_cs.isValid()); + + // Make sure we specified a manager for the correct namespace + if (_ns.size() && _manager) { + invariant(_manager->getns() == _ns); + } + + _conn = ClientConnections::threadInstance()->get(_cs.toString(), _ns); + usingAShardConnection(_cs.toString()); } ShardConnection::~ShardConnection() { @@ -437,22 +444,12 @@ ShardConnection::~ShardConnection() { } } -void ShardConnection::_init() { - invariant(_cs.isValid()); - _conn = ClientConnections::threadInstance()->get(_cs.toString(), _ns); - _finishedInit = false; - usingAShardConnection(_cs.toString()); -} - void ShardConnection::_finishInit() { if (_finishedInit) return; _finishedInit = true; if (versionManager.isVersionableCB(_conn)) { - // Make sure we specified a manager for the correct namespace - if (_ns.size() && _manager) - verify(_manager->getns() == _ns); _setVersion = versionManager.checkShardVersionCB(this, false, 1); } else { // Make sure we didn't specify a manager for a non-versionable connection (i.e. config) diff --git a/src/mongo/s/client/shard_connection.h b/src/mongo/s/client/shard_connection.h index c6731945ec1..8882961bb1c 100644 --- a/src/mongo/s/client/shard_connection.h +++ b/src/mongo/s/client/shard_connection.h @@ -123,13 +123,11 @@ public: static void forgetNS(const std::string& ns); private: - void _init(); void _finishInit(); const ConnectionString _cs; const std::string _ns; - - std::shared_ptr<ChunkManager> _manager; + const std::shared_ptr<ChunkManager> _manager; bool _finishedInit; diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 156f736413a..17b2abfd7bc 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -33,7 +33,6 @@ #include "mongo/platform/basic.h" #include "mongo/client/connpool.h" -#include "mongo/client/parallel.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 79d27a87e0f..4a1653e4fcb 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -108,11 +108,6 @@ bool inShutdown() { return dbexitCalled; } -bool haveLocalShardingInfo(Client* client, const string& ns) { - verify(0); - return false; -} - static BSONObj buildErrReply(const DBException& ex) { BSONObjBuilder errB; errB.append("$err", ex.what()); @@ -213,11 +208,12 @@ static Status initializeSharding(bool doUpgrade) { if (!status.isOK()) { return status; } + return Status::OK(); } static ExitCode runMongosServer(bool doUpgrade) { - setThreadName("mongosMain"); + Client::initThread("mongosMain"); printShardingVersionInfo(false); // Add sharding hooks to both connection pools - ShardingConnectionHook includes auth hooks diff --git a/src/mongo/s/version_manager.cpp b/src/mongo/s/version_manager.cpp index d3cb5ddc875..8faed905b60 100644 --- a/src/mongo/s/version_manager.cpp +++ b/src/mongo/s/version_manager.cpp @@ -196,9 +196,11 @@ bool initShardVersionEmptyNS(DBClientBase* conn_in) { conn = getVersionable(conn_in); dassert(conn); // errors thrown above - // Check to see if we've already initialized this connection - if (connectionShardStatus.hasAnySequenceSet(conn)) + // Check to see if we've already initialized this connection. This avoids sending + // setShardVersion multiple times. + if (connectionShardStatus.hasAnySequenceSet(conn)) { return false; + } // Check to see if this is actually a shard and not a single config server // NOTE: Config servers are registered only by the name "config" in the shard cache, not diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js index a24949f58c3..79b7049fc9a 100644 --- a/src/mongo/shell/shardingtest.js +++ b/src/mongo/shell/shardingtest.js @@ -1137,9 +1137,16 @@ ShardingTest.prototype.stopMongos = function(n) { }; /** - * Restarts a previously stopped mongos using the same parameter as before. + * Kills the mongod with index n. + */ +ShardingTest.prototype.stopMongod = function(n) { + MongoRunner.stopMongod(this['d' + n].port); +}; + +/** + * Restarts a previously stopped mongos using the same parameters as before. * - * Warning: Overwrites the old s (if n = 0) and sn member variables + * Warning: Overwrites the old s (if n = 0) and sn member variables. */ ShardingTest.prototype.restartMongos = function(n) { this.stopMongos(n); @@ -1152,6 +1159,22 @@ ShardingTest.prototype.restartMongos = function(n) { }; /** + * Restarts a previously stopped mongod using the same parameters as before. + * + * Warning: Overwrites the old dn member variables. + */ +ShardingTest.prototype.restartMongod = function(n) { + this.stopMongod(n); + + var cmdLine = this['d' + n].commandLine; + cmdLine['restart'] = true; + + var newConn = MongoRunner.runMongod(cmdLine); + + this['d' + n] = newConn; +}; + +/** * Helper method for setting primary shard of a database and making sure that it was successful. * Note: first mongos needs to be up. */ |