diff options
author | Randolph Tan <randolph@10gen.com> | 2016-12-08 15:24:22 -0500 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-01-20 10:24:44 -0500 |
commit | 774514cfad04866b42a1deb27b48488dec0f7520 (patch) | |
tree | 79001e110a689d8a819b1e28c97c53b74d2cb051 | |
parent | 098ef62876fcb6ac6fe5cd26201704c308bd5fea (diff) | |
download | mongo-774514cfad04866b42a1deb27b48488dec0f7520.tar.gz |
SERVER-26791 Shard metadata commands should perform partial refresh as much as possible
(cherry picked from commit 6add2c82bacecd5f54613ebf4be1553f3b046cbc)
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state_test.cpp | 382 |
3 files changed, 411 insertions, 78 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 53e74c1d5e3..cafeabe4537 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -210,6 +210,9 @@ env.CppUnitTest( 'sharding_state_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/query/query_request', + '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture', ], diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 1bcf3f4bfc1..998b6c368ce 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -272,41 +272,24 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn, } } - // At the first attempt try to use the currently loaded metadata and on subsequent attempts use - // the complete metadata - int numRefreshAttempts = 0; - - while (true) { - numRefreshAttempts++; - - auto refreshStatusAndVersion = - _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr)); - if (refreshStatusAndVersion.isOK()) { - LOG(1) << "Successfully refreshed metadata for " << nss.ns() << " to " - << refreshStatusAndVersion.getValue(); - return Status::OK(); - } - - if (refreshStatusAndVersion == ErrorCodes::RemoteChangeDetected && - numRefreshAttempts < kMaxNumMetadataRefreshAttempts) { - currentMetadata = ScopedCollectionMetadata(); - - log() << "Refresh failed and will be retried as full reload " - << refreshStatusAndVersion.getStatus(); - continue; - } - - return refreshStatusAndVersion.getStatus(); - } - - - MONGO_UNREACHABLE; + auto refreshStatusAndVersion = + _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr)); + return refreshStatusAndVersion.getStatus(); } Status ShardingState::refreshMetadataNow(OperationContext* txn, const NamespaceString& nss, ChunkVersion* latestShardVersion) { - auto refreshLatestShardVersionStatus = _refreshMetadata(txn, nss, nullptr); + ScopedCollectionMetadata currentMetadata; + + { + AutoGetCollection autoColl(txn, nss, MODE_IS); + + currentMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); + } + + auto refreshLatestShardVersionStatus = + _refreshMetadata(txn, nss, currentMetadata.getMetadata()); if (!refreshLatestShardVersionStatus.isOK()) { return refreshLatestShardVersionStatus.getStatus(); } @@ -667,24 +650,34 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( } } - // The _configServerTickets serializes this process such that only a small number of threads can - // try to refresh at the same time in order to avoid overloading the config server - _configServerTickets.waitForTicket(); - TicketHolderReleaser needTicketFrom(&_configServerTickets); - + Status status = {ErrorCodes::InternalError, "metadata refresh not performed"}; Timer t; + int numAttempts = 0; + std::unique_ptr<CollectionMetadata> remoteMetadata; + + do { + // The _configServerTickets serializes this process such that only a small number of threads + // can try to refresh at the same time in order to avoid overloading the config server. + _configServerTickets.waitForTicket(); + TicketHolderReleaser needTicketFrom(&_configServerTickets); + + if (status == ErrorCodes::RemoteChangeDetected) { + metadataForDiff = nullptr; + log() << "Refresh failed and will be retried as full reload " << status; + } - log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: " - << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)"); - - std::unique_ptr<CollectionMetadata> remoteMetadata(stdx::make_unique<CollectionMetadata>()); + log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: " + << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)"); - Status status = MetadataLoader::makeCollectionMetadata(txn, - grid.catalogClient(txn), - nss.ns(), - getShardName(), - metadataForDiff, - remoteMetadata.get()); + remoteMetadata = stdx::make_unique<CollectionMetadata>(); + status = MetadataLoader::makeCollectionMetadata(txn, + grid.catalogClient(txn), + nss.ns(), + getShardName(), + metadataForDiff, + remoteMetadata.get()); + } while (status == ErrorCodes::RemoteChangeDetected && + ++numAttempts < kMaxNumMetadataRefreshAttempts); if (!status.isOK() && status != ErrorCodes::NamespaceNotFound) { warning() << "MetadataLoader failed after " << t.millis() << " ms" @@ -699,24 +692,23 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( auto css = CollectionShardingState::get(txn, nss); - if (status.isOK()) { - css->refreshMetadata(txn, std::move(remoteMetadata)); - - auto metadata = css->getMetadata(); + if (!status.isOK()) { + invariant(status == ErrorCodes::NamespaceNotFound); + css->refreshMetadata(txn, nullptr); - log() << "MetadataLoader took " << t.millis() << " ms and found version " - << metadata->getCollVersion(); + log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace"; - return metadata->getShardVersion(); + return ChunkVersion::UNSHARDED(); } - invariant(status == ErrorCodes::NamespaceNotFound); + css->refreshMetadata(txn, std::move(remoteMetadata)); - css->refreshMetadata(txn, nullptr); + auto metadata = css->getMetadata(); - log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace"; + log() << "MetadataLoader took " << t.millis() << " ms and found version " + << metadata->getCollVersion(); - return ChunkVersion::UNSHARDED(); + return metadata->getShardVersion(); } StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk( diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index 0156058676d..173964edaea 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -30,15 +30,21 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/query_request.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context_noop.h" #include "mongo/db/storage/storage_options.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/sharding_mongod_test_fixture.h" @@ -54,11 +60,42 @@ public: return &_shardingState; } + std::string shardName() const { + return _shardName.toString(); + } + + void setupCollectionMetadata(const NamespaceString& nss, + const OID& epoch, + const std::vector<BSONObj>& initChunks) { + auto future = launchAsync([this, &nss] { + ChunkVersion latestShardVersion; + Client::initThreadIfNotAlready(); + ASSERT_OK( + shardingState()->refreshMetadataNow(operationContext(), nss, &latestShardVersion)); + }); + + ChunkVersion initVersion(1, 0, epoch); + onFindCommand([&nss, &initVersion](const RemoteCommandRequest&) { + CollectionType coll; + coll.setNs(nss); + coll.setUpdatedAt(Date_t()); + coll.setEpoch(initVersion.epoch()); + coll.setKeyPattern(BSON("x" << 1)); + return std::vector<BSONObj>{coll.toBSON()}; + }); + + onFindCommand([&initChunks](const RemoteCommandRequest&) { return initChunks; }); + + future.timed_get(kFutureTimeout); + } + protected: // Used to write to set up local collections before exercising server logic. std::unique_ptr<DBDirectClient> _dbDirectClient; void setUp() override { + _shardName = ShardId("a"); + serverGlobalParams.clusterRole = ClusterRole::None; ShardingMongodTestFixture::setUp(); @@ -76,6 +113,7 @@ protected: auto configTargeter = RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); configTargeter->setConnectionStringReturnValue(configConnStr); + configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]); return Status::OK(); }); @@ -96,20 +134,32 @@ protected: ShardingMongodTestFixture::tearDown(); } + std::unique_ptr<DistLockManager> makeDistLockManager( + std::unique_ptr<DistLockCatalog> distLockCatalog) override { + return stdx::make_unique<DistLockManagerMock>(nullptr); + } + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + invariant(distLockManager); + return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); + } + private: ShardingState _shardingState; + ShardId _shardName; }; TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ("a", shardingState()->getShardName()); + ASSERT_EQ(shardName(), shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -117,7 +167,7 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest( @@ -152,7 +202,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -160,7 +210,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName("a"); + shardIdentity2.setShardName(shardName()); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -171,7 +221,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ("a", shardingState()->getShardName()); + ASSERT_EQ(shardName(), shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -180,7 +230,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -188,7 +238,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "b:2,c:3", "config")); - shardIdentity2.setShardName("a"); + shardIdentity2.setShardName(shardName()); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -199,7 +249,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ("a", shardingState()->getShardName()); + ASSERT_EQ(shardName(), shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -208,7 +258,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -216,7 +266,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS")); - shardIdentity2.setShardName("a"); + shardIdentity2.setShardName(shardName()); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -228,7 +278,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ("a", shardingState()->getShardName()); + ASSERT_EQ(shardName(), shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -237,7 +287,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -257,7 +307,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ("a", shardingState()->getShardName()); + ASSERT_EQ(shardName(), shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -265,7 +315,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -273,7 +323,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName("a"); + shardIdentity2.setShardName(shardName()); shardIdentity2.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest( @@ -285,7 +335,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ("a", shardingState()->getShardName()); + ASSERT_EQ(shardName(), shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } @@ -327,7 +377,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -375,7 +425,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -415,7 +465,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -485,7 +535,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); BSONObj validShardIdentity = shardIdentity.toBSON(); @@ -545,7 +595,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); + shardIdentity.setShardName(shardName()); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); BSONObj validShardIdentity = shardIdentity.toBSON(); @@ -560,5 +610,293 @@ TEST_F(ShardingStateTest, ASSERT_FALSE(swShardingInitialized.getValue()); } +TEST_F(ShardingStateTest, MetadataRefreshShouldUseDiffQuery) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(shardName()); + shardIdentity.setClusterId(OID::gen()); + + ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); + + const NamespaceString nss("test.user"); + const OID initEpoch(OID::gen()); + + { + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 0)); + chunk.setMax(BSON("x" << 10)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(2, 0, initEpoch)); + setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); + } + + const ChunkVersion newVersion(3, 0, initEpoch); + auto future = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion)); + }); + + onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) { + CollectionType coll; + coll.setNs(nss); + coll.setUpdatedAt(Date_t()); + coll.setEpoch(initEpoch); + coll.setKeyPattern(BSON("x" << 1)); + return std::vector<BSONObj>{coll.toBSON()}; + }); + + onFindCommand([this, &nss, &initEpoch](const RemoteCommandRequest& request) { + auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); + ASSERT_OK(diffQueryStatus.getStatus()); + + auto diffQuery = std::move(diffQueryStatus.getValue()); + ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))), + diffQuery->getFilter()); + + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 10)); + chunk.setMax(BSON("x" << 20)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(3, 10, initEpoch)); + return std::vector<BSONObj>{chunk.toBSON()}; + }); + + future.timed_get(kFutureTimeout); +} + +/** + * Test where the epoch changed right before the chunk diff query. + */ +TEST_F(ShardingStateTest, MetadataRefreshShouldUseFullQueryOnEpochMismatch) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(shardName()); + shardIdentity.setClusterId(OID::gen()); + + ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); + + const NamespaceString nss("test.user"); + const OID initEpoch(OID::gen()); + + { + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 0)); + chunk.setMax(BSON("x" << 10)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(2, 0, initEpoch)); + setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); + } + + + auto future = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_OK(shardingState()->onStaleShardVersion( + operationContext(), nss, ChunkVersion(3, 0, initEpoch))); + }); + + onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) { + CollectionType coll; + coll.setNs(nss); + coll.setUpdatedAt(Date_t()); + coll.setEpoch(initEpoch); + coll.setKeyPattern(BSON("x" << 1)); + return std::vector<BSONObj>{coll.toBSON()}; + }); + + // Now when the diff query is performed, it will get chunks with a different epoch. + const ChunkVersion newVersion(3, 0, OID::gen()); + onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) { + auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); + ASSERT_OK(diffQueryStatus.getStatus()); + + auto diffQuery = std::move(diffQueryStatus.getValue()); + ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))), + diffQuery->getFilter()); + + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 10)); + chunk.setMax(BSON("x" << 20)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch())); + return std::vector<BSONObj>{chunk.toBSON()}; + }); + + // Retry the refresh again. Now doing a full reload. + + onFindCommand([&nss, &newVersion](const RemoteCommandRequest&) { + CollectionType coll; + coll.setNs(nss); + coll.setUpdatedAt(Date_t()); + coll.setEpoch(newVersion.epoch()); + coll.setKeyPattern(BSON("x" << 1)); + return std::vector<BSONObj>{coll.toBSON()}; + }); + + onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) { + auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); + ASSERT_OK(diffQueryStatus.getStatus()); + + auto diffQuery = std::move(diffQueryStatus.getValue()); + ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))), + diffQuery->getFilter()); + + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 10)); + chunk.setMax(BSON("x" << 20)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch())); + return std::vector<BSONObj>{chunk.toBSON()}; + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ShardingStateTest, FullMetadataOnEpochMismatchShouldStopAfterMaxRetries) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(shardName()); + shardIdentity.setClusterId(OID::gen()); + + ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); + + const NamespaceString nss("test.user"); + const OID initEpoch(OID::gen()); + + { + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 0)); + chunk.setMax(BSON("x" << 10)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(2, 0, initEpoch)); + setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); + } + + + auto future = launchAsync([&] { + Client::initThreadIfNotAlready(); + auto status = shardingState()->onStaleShardVersion( + operationContext(), nss, ChunkVersion(3, 0, initEpoch)); + ASSERT_EQ(ErrorCodes::RemoteChangeDetected, status); + }); + + OID lastEpoch(initEpoch); + OID nextEpoch(OID::gen()); + for (int tries = 0; tries < 3; tries++) { + onFindCommand([&nss, &lastEpoch](const RemoteCommandRequest&) { + CollectionType coll; + coll.setNs(nss); + coll.setUpdatedAt(Date_t()); + coll.setEpoch(lastEpoch); + coll.setKeyPattern(BSON("x" << 1)); + return std::vector<BSONObj>{coll.toBSON()}; + }); + + onFindCommand([this, &nss, &nextEpoch, tries](const RemoteCommandRequest& request) { + auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); + ASSERT_OK(diffQueryStatus.getStatus()); + + auto diffQuery = std::move(diffQueryStatus.getValue()); + Timestamp expectedLastMod = (tries == 0) ? Timestamp(2, 0) : Timestamp(0, 0); + ASSERT_BSONOBJ_EQ( + BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << expectedLastMod)), + diffQuery->getFilter()); + + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 10)); + chunk.setMax(BSON("x" << 20)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(3, 10, nextEpoch)); + return std::vector<BSONObj>{chunk.toBSON()}; + }); + + lastEpoch = nextEpoch; + nextEpoch = OID::gen(); + } + + future.timed_get(kFutureTimeout); +} + +TEST_F(ShardingStateTest, MetadataRefreshShouldBeOkWhenCollectionWasDropped) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(shardName()); + shardIdentity.setClusterId(OID::gen()); + + ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); + + const NamespaceString nss("test.user"); + const OID initEpoch(OID::gen()); + + { + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 0)); + chunk.setMax(BSON("x" << 10)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(2, 0, initEpoch)); + setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); + } + + const ChunkVersion newVersion(3, 0, initEpoch); + auto future = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion)); + }); + + onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) { + CollectionType coll; + coll.setNs(nss); + coll.setUpdatedAt(Date_t()); + coll.setEpoch(initEpoch); + coll.setKeyPattern(BSON("x" << 1)); + coll.setDropped(true); + return std::vector<BSONObj>{coll.toBSON()}; + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ShardingStateTest, MetadataRefreshShouldNotRetryOtherTypesOfError) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(shardName()); + shardIdentity.setClusterId(OID::gen()); + + ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); + + const NamespaceString nss("test.user"); + const OID initEpoch(OID::gen()); + + { + ChunkType chunk; + chunk.setNS(nss.ns()); + chunk.setMin(BSON("x" << 0)); + chunk.setMax(BSON("x" << 10)); + chunk.setShard(ShardId(shardName())); + chunk.setVersion(ChunkVersion(2, 0, initEpoch)); + setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()}); + } + + auto configTargeter = + RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); + configTargeter->setFindHostReturnValue({ErrorCodes::HostNotFound, "host erased by test"}); + + auto status = shardingState()->onStaleShardVersion( + operationContext(), nss, ChunkVersion(3, 0, initEpoch)); + ASSERT_EQ(ErrorCodes::HostNotFound, status); +} + } // namespace } // namespace mongo |