diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-01-19 17:20:12 -0500 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-01-19 17:20:12 -0500 |
commit | d3e67186d1f9c633e8e69ebb7bf2418d3850688a (patch) | |
tree | 570532480170d0bc8bc757ea9f517c1ac987c18d | |
parent | 9edfc4c8ba273d54ecdc31c1fc0eb8c6a42ccbc4 (diff) | |
download | mongo-d3e67186d1f9c633e8e69ebb7bf2418d3850688a.tar.gz |
Revert "SERVER-26791 Shard metadata commands should perform partial refresh as much as possible"
This reverts commit 6add2c82bacecd5f54613ebf4be1553f3b046cbc.
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml | 1 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state_test.cpp | 381 |
4 files changed, 78 insertions, 411 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index 9213045e0e6..462d8cff458 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -66,7 +66,6 @@ selector: - jstests/sharding/listDatabases.js - jstests/sharding/bulk_insert.js - jstests/sharding/printShardingStatus.js - - jstests/sharding/cmds_epoch_mismatch.js # Balancer writes (direct write to config database with no retries) - jstests/sharding/convert_to_and_from_sharded.js - jstests/sharding/remove2.js diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index c82b0701de3..eac2ad60283 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -226,9 +226,6 @@ env.CppUnitTest( 'sharding_state_test.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/db/query/query_request', - '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', ], diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 258f5191b9f..ab9ca2c7dd7 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -273,24 +273,41 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn, } } - auto refreshStatusAndVersion = - _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr)); - return refreshStatusAndVersion.getStatus(); -} + // At the first attempt try to use the currently loaded metadata and on subsequent attempts use + // the complete metadata + int numRefreshAttempts = 0; + + while (true) { + numRefreshAttempts++; + + auto refreshStatusAndVersion = + _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr)); + if (refreshStatusAndVersion.isOK()) { + LOG(1) << "Successfully refreshed metadata for " << nss.ns() << " to " + << refreshStatusAndVersion.getValue(); + return Status::OK(); + } -Status ShardingState::refreshMetadataNow(OperationContext* txn, - const NamespaceString& nss, - ChunkVersion* latestShardVersion) { - ScopedCollectionMetadata currentMetadata; + if (refreshStatusAndVersion == ErrorCodes::RemoteChangeDetected && + numRefreshAttempts < kMaxNumMetadataRefreshAttempts) { + currentMetadata = ScopedCollectionMetadata(); - { - AutoGetCollection autoColl(txn, nss, MODE_IS); + log() << "Refresh failed and will be retried as full reload " + << refreshStatusAndVersion.getStatus(); + continue; + } - currentMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); + return refreshStatusAndVersion.getStatus(); } - auto refreshLatestShardVersionStatus = - _refreshMetadata(txn, nss, currentMetadata.getMetadata()); + + MONGO_UNREACHABLE; +} + +Status ShardingState::refreshMetadataNow(OperationContext* txn, + const NamespaceString& nss, + ChunkVersion* latestShardVersion) { + auto refreshLatestShardVersionStatus = _refreshMetadata(txn, nss, nullptr); if (!refreshLatestShardVersionStatus.isOK()) { return refreshLatestShardVersionStatus.getStatus(); } @@ -651,34 +668,24 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( } } - Status status = {ErrorCodes::InternalError, "metadata refresh not performed"}; + // The _configServerTickets serializes this process such that only a small number of threads can + // try to refresh at the same time in order to avoid overloading the config server + _configServerTickets.waitForTicket(); + TicketHolderReleaser needTicketFrom(&_configServerTickets); + Timer t; - int numAttempts = 0; - std::unique_ptr<CollectionMetadata> remoteMetadata; - - do { - // The _configServerTickets serializes this process such that only a small number of threads - // can try to refresh at the same time in order to avoid overloading the config server. - _configServerTickets.waitForTicket(); - TicketHolderReleaser needTicketFrom(&_configServerTickets); - - if (status == ErrorCodes::RemoteChangeDetected) { - metadataForDiff = nullptr; - log() << "Refresh failed and will be retried as full reload " << status; - } - log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: " - << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)"); + log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: " + << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)"); + + std::unique_ptr<CollectionMetadata> remoteMetadata(stdx::make_unique<CollectionMetadata>()); - remoteMetadata = stdx::make_unique<CollectionMetadata>(); - status = MetadataLoader::makeCollectionMetadata(txn, - grid.catalogClient(txn), - nss.ns(), - getShardName(), - metadataForDiff, - remoteMetadata.get()); - } while (status == ErrorCodes::RemoteChangeDetected && - ++numAttempts < kMaxNumMetadataRefreshAttempts); + Status status = MetadataLoader::makeCollectionMetadata(txn, + grid.catalogClient(txn), + nss.ns(), + getShardName(), + metadataForDiff, + remoteMetadata.get()); if (!status.isOK() && status != ErrorCodes::NamespaceNotFound) { warning() << "MetadataLoader failed after " << t.millis() << " ms" @@ -693,23 +700,24 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( auto css = CollectionShardingState::get(txn, nss); - if (!status.isOK()) { - invariant(status == ErrorCodes::NamespaceNotFound); - css->refreshMetadata(txn, nullptr); + if (status.isOK()) { + css->refreshMetadata(txn, std::move(remoteMetadata)); + + auto metadata = css->getMetadata(); - log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace"; + log() << "MetadataLoader took " << t.millis() << " ms and found version " + << metadata->getCollVersion(); - return ChunkVersion::UNSHARDED(); + return metadata->getShardVersion(); } - css->refreshMetadata(txn, std::move(remoteMetadata)); + invariant(status == ErrorCodes::NamespaceNotFound); - auto metadata = css->getMetadata(); + css->refreshMetadata(txn, nullptr); - log() << "MetadataLoader took " << t.millis() << " ms and found version " - << metadata->getCollVersion(); + log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace"; - return metadata->getShardVersion(); + return ChunkVersion::UNSHARDED(); } StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk( diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index e4953f3488c..0156058676d 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -33,17 +33,12 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/query/query_request.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context_noop.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/s/catalog/dist_lock_manager_mock.h" -#include "mongo/s/catalog/sharding_catalog_client_impl.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/sharding_mongod_test_fixture.h" @@ -59,42 +54,11 @@ public: return &_shardingState; } - std::string shardName() const { - return _shardName.toString(); - } - - void setupCollectionMetadata(const NamespaceString& nss, - const OID& epoch, - const std::vector<BSONObj>& initChunks) { - auto future = launchAsync([this, &nss] { - ChunkVersion latestShardVersion; - Client::initThreadIfNotAlready(); - ASSERT_OK( - shardingState()->refreshMetadataNow(operationContext(), nss, &latestShardVersion)); - }); - - ChunkVersion initVersion(1, 0, epoch); - onFindCommand([&nss, &initVersion](const RemoteCommandRequest&) { - CollectionType coll; - coll.setNs(nss); - coll.setUpdatedAt(Date_t()); - coll.setEpoch(initVersion.epoch()); - coll.setKeyPattern(BSON("x" << 1)); - return std::vector<BSONObj>{coll.toBSON()}; - }); - - onFindCommand([&initChunks](const RemoteCommandRequest&) { return initChunks; }); - - future.timed_get(kFutureTimeout); - } - protected: // Used to write to set up local collections before exercising server logic. std::unique_ptr<DBDirectClient> _dbDirectClient; void setUp() override { - _shardName = ShardId("a"); - serverGlobalParams.clusterRole = ClusterRole::None; ShardingMongodTestFixture::setUp(); @@ -112,7 +76,6 @@ protected: auto configTargeter = RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); configTargeter->setConnectionStringReturnValue(configConnStr); - configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]); return Status::OK(); }); @@ -133,32 +96,20 @@ protected: ShardingMongodTestFixture::tearDown(); } - std::unique_ptr<DistLockManager> makeDistLockManager( - std::unique_ptr<DistLockCatalog> distLockCatalog) override { - return stdx::make_unique<DistLockManagerMock>(nullptr); - } - - std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( - std::unique_ptr<DistLockManager> distLockManager) override { - invariant(distLockManager); - return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); - } - private: ShardingState _shardingState; - ShardId _shardName; }; TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -166,7 +117,7 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest( @@ -201,7 +152,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -209,7 +160,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName(shardName()); + shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -220,7 +171,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -229,7 +180,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -237,7 +188,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "b:2,c:3", "config")); - shardIdentity2.setShardName(shardName()); + shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -248,7 +199,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -257,7 +208,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -265,7 +216,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS")); - shardIdentity2.setShardName(shardName()); + shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -277,7 +228,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -286,7 +237,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -306,7 +257,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -314,7 +265,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -322,7 +273,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName(shardName()); + shardIdentity2.setShardName("a"); shardIdentity2.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest( @@ -334,7 +285,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ("a", shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -376,7 +327,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -424,7 +375,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -464,7 +415,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -534,7 +485,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); BSONObj validShardIdentity = shardIdentity.toBSON(); @@ -594,7 +545,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); BSONObj validShardIdentity = shardIdentity.toBSON(); @@ -609,293 +560,5 @@ TEST_F(ShardingStateTest, ASSERT_FALSE(swShardingInitialized.getValue()); } -TEST_F(ShardingStateTest, MetadataRefreshShouldUseDiffQuery) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - const NamespaceString nss("test.user"); - const OID initEpoch(OID::gen()); - - { - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 0)); - chunk.setMax(BSON("x" << 10)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(2, 0, initEpoch)); - setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); - } - - const ChunkVersion newVersion(3, 0, initEpoch); - auto future = launchAsync([&] { - Client::initThreadIfNotAlready(); - ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion)); - }); - - onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) { - CollectionType coll; - coll.setNs(nss); - coll.setUpdatedAt(Date_t()); - coll.setEpoch(initEpoch); - coll.setKeyPattern(BSON("x" << 1)); - return std::vector<BSONObj>{coll.toBSON()}; - }); - - onFindCommand([this, &nss, &initEpoch](const RemoteCommandRequest& request) { - auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); - ASSERT_OK(diffQueryStatus.getStatus()); - - auto diffQuery = std::move(diffQueryStatus.getValue()); - ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))), - diffQuery->getFilter()); - - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 10)); - chunk.setMax(BSON("x" << 20)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(3, 10, initEpoch)); - return std::vector<BSONObj>{chunk.toBSON()}; - }); - - future.timed_get(kFutureTimeout); -} - -/** - * Test where the epoch changed right before the chunk diff query. - */ -TEST_F(ShardingStateTest, MetadataRefreshShouldUseFullQueryOnEpochMismatch) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - const NamespaceString nss("test.user"); - const OID initEpoch(OID::gen()); - - { - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 0)); - chunk.setMax(BSON("x" << 10)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(2, 0, initEpoch)); - setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); - } - - - auto future = launchAsync([&] { - Client::initThreadIfNotAlready(); - ASSERT_OK(shardingState()->onStaleShardVersion( - operationContext(), nss, ChunkVersion(3, 0, initEpoch))); - }); - - onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) { - CollectionType coll; - coll.setNs(nss); - coll.setUpdatedAt(Date_t()); - coll.setEpoch(initEpoch); - coll.setKeyPattern(BSON("x" << 1)); - return std::vector<BSONObj>{coll.toBSON()}; - }); - - // Now when the diff query is performed, it will get chunks with a different epoch. - const ChunkVersion newVersion(3, 0, OID::gen()); - onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) { - auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); - ASSERT_OK(diffQueryStatus.getStatus()); - - auto diffQuery = std::move(diffQueryStatus.getValue()); - ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))), - diffQuery->getFilter()); - - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 10)); - chunk.setMax(BSON("x" << 20)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch())); - return std::vector<BSONObj>{chunk.toBSON()}; - }); - - // Retry the refresh again. Now doing a full reload. - - onFindCommand([&nss, &newVersion](const RemoteCommandRequest&) { - CollectionType coll; - coll.setNs(nss); - coll.setUpdatedAt(Date_t()); - coll.setEpoch(newVersion.epoch()); - coll.setKeyPattern(BSON("x" << 1)); - return std::vector<BSONObj>{coll.toBSON()}; - }); - - onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) { - auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); - ASSERT_OK(diffQueryStatus.getStatus()); - - auto diffQuery = std::move(diffQueryStatus.getValue()); - ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))), - diffQuery->getFilter()); - - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 10)); - chunk.setMax(BSON("x" << 20)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch())); - return std::vector<BSONObj>{chunk.toBSON()}; - }); - - future.timed_get(kFutureTimeout); -} - -TEST_F(ShardingStateTest, FullMetadataOnEpochMismatchShouldStopAfterMaxRetries) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - const NamespaceString nss("test.user"); - const OID initEpoch(OID::gen()); - - { - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 0)); - chunk.setMax(BSON("x" << 10)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(2, 0, initEpoch)); - setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); - } - - - auto future = launchAsync([&] { - Client::initThreadIfNotAlready(); - auto status = shardingState()->onStaleShardVersion( - operationContext(), nss, ChunkVersion(3, 0, initEpoch)); - ASSERT_EQ(ErrorCodes::RemoteChangeDetected, status); - }); - - OID lastEpoch(initEpoch); - OID nextEpoch(OID::gen()); - for (int tries = 0; tries < 3; tries++) { - onFindCommand([&nss, &lastEpoch](const RemoteCommandRequest&) { - CollectionType coll; - coll.setNs(nss); - coll.setUpdatedAt(Date_t()); - coll.setEpoch(lastEpoch); - coll.setKeyPattern(BSON("x" << 1)); - return std::vector<BSONObj>{coll.toBSON()}; - }); - - onFindCommand([this, &nss, &nextEpoch, tries](const RemoteCommandRequest& request) { - auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); - ASSERT_OK(diffQueryStatus.getStatus()); - - auto diffQuery = std::move(diffQueryStatus.getValue()); - Timestamp expectedLastMod = (tries == 0) ? Timestamp(2, 0) : Timestamp(0, 0); - ASSERT_BSONOBJ_EQ( - BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << expectedLastMod)), - diffQuery->getFilter()); - - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 10)); - chunk.setMax(BSON("x" << 20)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(3, 10, nextEpoch)); - return std::vector<BSONObj>{chunk.toBSON()}; - }); - - lastEpoch = nextEpoch; - nextEpoch = OID::gen(); - } - - future.timed_get(kFutureTimeout); -} - -TEST_F(ShardingStateTest, MetadataRefreshShouldBeOkWhenCollectionWasDropped) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - const NamespaceString nss("test.user"); - const OID initEpoch(OID::gen()); - - { - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 0)); - chunk.setMax(BSON("x" << 10)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(2, 0, initEpoch)); - setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); - } - - const ChunkVersion newVersion(3, 0, initEpoch); - auto future = launchAsync([&] { - Client::initThreadIfNotAlready(); - ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion)); - }); - - onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) { - CollectionType coll; - coll.setNs(nss); - coll.setUpdatedAt(Date_t()); - coll.setEpoch(initEpoch); - coll.setKeyPattern(BSON("x" << 1)); - coll.setDropped(true); - return std::vector<BSONObj>{coll.toBSON()}; - }); - - future.timed_get(kFutureTimeout); -} - -TEST_F(ShardingStateTest, MetadataRefreshShouldNotRetryOtherTypesOfError) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - const NamespaceString nss("test.user"); - const OID initEpoch(OID::gen()); - - { - ChunkType chunk; - chunk.setNS(nss.ns()); - chunk.setMin(BSON("x" << 0)); - chunk.setMax(BSON("x" << 10)); - chunk.setShard(ShardId(shardName())); - chunk.setVersion(ChunkVersion(2, 0, initEpoch)); - setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); - } - - auto configTargeter = - RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); - configTargeter->setFindHostReturnValue({ErrorCodes::HostNotFound, "host erased by test"}); - - auto status = shardingState()->onStaleShardVersion( - operationContext(), nss, ChunkVersion(3, 0, initEpoch)); - ASSERT_EQ(ErrorCodes::HostNotFound, status); -} - } // namespace } // namespace mongo |