summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2016-12-08 15:24:22 -0500
committerRandolph Tan <randolph@10gen.com>2017-01-20 10:24:44 -0500
commit774514cfad04866b42a1deb27b48488dec0f7520 (patch)
tree79001e110a689d8a819b1e28c97c53b74d2cb051
parent098ef62876fcb6ac6fe5cd26201704c308bd5fea (diff)
downloadmongo-774514cfad04866b42a1deb27b48488dec0f7520.tar.gz
SERVER-26791 Shard metadata commands should perform partial refresh as much as possible
(cherry picked from commit 6add2c82bacecd5f54613ebf4be1553f3b046cbc)
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/sharding_state.cpp104
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp382
3 files changed, 411 insertions, 78 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 53e74c1d5e3..cafeabe4537 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -210,6 +210,9 @@ env.CppUnitTest(
'sharding_state_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
+ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
],
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 1bcf3f4bfc1..998b6c368ce 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -272,41 +272,24 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn,
}
}
- // At the first attempt try to use the currently loaded metadata and on subsequent attempts use
- // the complete metadata
- int numRefreshAttempts = 0;
-
- while (true) {
- numRefreshAttempts++;
-
- auto refreshStatusAndVersion =
- _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr));
- if (refreshStatusAndVersion.isOK()) {
- LOG(1) << "Successfully refreshed metadata for " << nss.ns() << " to "
- << refreshStatusAndVersion.getValue();
- return Status::OK();
- }
-
- if (refreshStatusAndVersion == ErrorCodes::RemoteChangeDetected &&
- numRefreshAttempts < kMaxNumMetadataRefreshAttempts) {
- currentMetadata = ScopedCollectionMetadata();
-
- log() << "Refresh failed and will be retried as full reload "
- << refreshStatusAndVersion.getStatus();
- continue;
- }
-
- return refreshStatusAndVersion.getStatus();
- }
-
-
- MONGO_UNREACHABLE;
+ auto refreshStatusAndVersion =
+ _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr));
+ return refreshStatusAndVersion.getStatus();
}
Status ShardingState::refreshMetadataNow(OperationContext* txn,
const NamespaceString& nss,
ChunkVersion* latestShardVersion) {
- auto refreshLatestShardVersionStatus = _refreshMetadata(txn, nss, nullptr);
+ ScopedCollectionMetadata currentMetadata;
+
+ {
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
+
+ currentMetadata = CollectionShardingState::get(txn, nss)->getMetadata();
+ }
+
+ auto refreshLatestShardVersionStatus =
+ _refreshMetadata(txn, nss, currentMetadata.getMetadata());
if (!refreshLatestShardVersionStatus.isOK()) {
return refreshLatestShardVersionStatus.getStatus();
}
@@ -667,24 +650,34 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata(
}
}
- // The _configServerTickets serializes this process such that only a small number of threads can
- // try to refresh at the same time in order to avoid overloading the config server
- _configServerTickets.waitForTicket();
- TicketHolderReleaser needTicketFrom(&_configServerTickets);
-
+ Status status = {ErrorCodes::InternalError, "metadata refresh not performed"};
Timer t;
+ int numAttempts = 0;
+ std::unique_ptr<CollectionMetadata> remoteMetadata;
+
+ do {
+ // The _configServerTickets serializes this process such that only a small number of threads
+ // can try to refresh at the same time in order to avoid overloading the config server.
+ _configServerTickets.waitForTicket();
+ TicketHolderReleaser needTicketFrom(&_configServerTickets);
+
+ if (status == ErrorCodes::RemoteChangeDetected) {
+ metadataForDiff = nullptr;
+ log() << "Refresh failed and will be retried as full reload " << status;
+ }
- log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: "
- << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)");
-
- std::unique_ptr<CollectionMetadata> remoteMetadata(stdx::make_unique<CollectionMetadata>());
+ log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: "
+ << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)");
- Status status = MetadataLoader::makeCollectionMetadata(txn,
- grid.catalogClient(txn),
- nss.ns(),
- getShardName(),
- metadataForDiff,
- remoteMetadata.get());
+ remoteMetadata = stdx::make_unique<CollectionMetadata>();
+ status = MetadataLoader::makeCollectionMetadata(txn,
+ grid.catalogClient(txn),
+ nss.ns(),
+ getShardName(),
+ metadataForDiff,
+ remoteMetadata.get());
+ } while (status == ErrorCodes::RemoteChangeDetected &&
+ ++numAttempts < kMaxNumMetadataRefreshAttempts);
if (!status.isOK() && status != ErrorCodes::NamespaceNotFound) {
warning() << "MetadataLoader failed after " << t.millis() << " ms"
@@ -699,24 +692,23 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata(
auto css = CollectionShardingState::get(txn, nss);
- if (status.isOK()) {
- css->refreshMetadata(txn, std::move(remoteMetadata));
-
- auto metadata = css->getMetadata();
+ if (!status.isOK()) {
+ invariant(status == ErrorCodes::NamespaceNotFound);
+ css->refreshMetadata(txn, nullptr);
- log() << "MetadataLoader took " << t.millis() << " ms and found version "
- << metadata->getCollVersion();
+ log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace";
- return metadata->getShardVersion();
+ return ChunkVersion::UNSHARDED();
}
- invariant(status == ErrorCodes::NamespaceNotFound);
+ css->refreshMetadata(txn, std::move(remoteMetadata));
- css->refreshMetadata(txn, nullptr);
+ auto metadata = css->getMetadata();
- log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace";
+ log() << "MetadataLoader took " << t.millis() << " ms and found version "
+ << metadata->getCollVersion();
- return ChunkVersion::UNSHARDED();
+ return metadata->getShardVersion();
}
StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk(
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index 0156058676d..173964edaea 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -30,15 +30,21 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/db/storage/storage_options.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_mongod_test_fixture.h"
@@ -54,11 +60,42 @@ public:
return &_shardingState;
}
+ std::string shardName() const {
+ return _shardName.toString();
+ }
+
+ void setupCollectionMetadata(const NamespaceString& nss,
+ const OID& epoch,
+ const std::vector<BSONObj>& initChunks) {
+ auto future = launchAsync([this, &nss] {
+ ChunkVersion latestShardVersion;
+ Client::initThreadIfNotAlready();
+ ASSERT_OK(
+ shardingState()->refreshMetadataNow(operationContext(), nss, &latestShardVersion));
+ });
+
+ ChunkVersion initVersion(1, 0, epoch);
+ onFindCommand([&nss, &initVersion](const RemoteCommandRequest&) {
+ CollectionType coll;
+ coll.setNs(nss);
+ coll.setUpdatedAt(Date_t());
+ coll.setEpoch(initVersion.epoch());
+ coll.setKeyPattern(BSON("x" << 1));
+ return std::vector<BSONObj>{coll.toBSON()};
+ });
+
+ onFindCommand([&initChunks](const RemoteCommandRequest&) { return initChunks; });
+
+ future.timed_get(kFutureTimeout);
+ }
+
protected:
// Used to write to set up local collections before exercising server logic.
std::unique_ptr<DBDirectClient> _dbDirectClient;
void setUp() override {
+ _shardName = ShardId("a");
+
serverGlobalParams.clusterRole = ClusterRole::None;
ShardingMongodTestFixture::setUp();
@@ -76,6 +113,7 @@ protected:
auto configTargeter =
RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
configTargeter->setConnectionStringReturnValue(configConnStr);
+ configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]);
return Status::OK();
});
@@ -96,20 +134,32 @@ protected:
ShardingMongodTestFixture::tearDown();
}
+ std::unique_ptr<DistLockManager> makeDistLockManager(
+ std::unique_ptr<DistLockCatalog> distLockCatalog) override {
+ return stdx::make_unique<DistLockManagerMock>(nullptr);
+ }
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ invariant(distLockManager);
+ return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
+ }
+
private:
ShardingState _shardingState;
+ ShardId _shardName;
};
TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ("a", shardingState()->getShardName());
+ ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -117,7 +167,7 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
shardingState()->setGlobalInitMethodForTest(
@@ -152,7 +202,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -160,7 +210,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity2.setShardName("a");
+ shardIdentity2.setShardName(shardName());
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
@@ -171,7 +221,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2));
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ("a", shardingState()->getShardName());
+ ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -180,7 +230,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -188,7 +238,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "b:2,c:3", "config"));
- shardIdentity2.setShardName("a");
+ shardIdentity2.setShardName(shardName());
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
@@ -199,7 +249,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2));
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ("a", shardingState()->getShardName());
+ ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -208,7 +258,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -216,7 +266,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS"));
- shardIdentity2.setShardName("a");
+ shardIdentity2.setShardName(shardName());
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
@@ -228,7 +278,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ("a", shardingState()->getShardName());
+ ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -237,7 +287,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -257,7 +307,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ("a", shardingState()->getShardName());
+ ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -265,7 +315,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -273,7 +323,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity2.setShardName("a");
+ shardIdentity2.setShardName(shardName());
shardIdentity2.setClusterId(OID::gen());
shardingState()->setGlobalInitMethodForTest(
@@ -285,7 +335,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ("a", shardingState()->getShardName());
+ ASSERT_EQ(shardName(), shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -327,7 +377,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
@@ -375,7 +425,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
@@ -415,7 +465,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
@@ -485,7 +535,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
BSONObj validShardIdentity = shardIdentity.toBSON();
@@ -545,7 +595,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
+ shardIdentity.setShardName(shardName());
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
BSONObj validShardIdentity = shardIdentity.toBSON();
@@ -560,5 +610,293 @@ TEST_F(ShardingStateTest,
ASSERT_FALSE(swShardingInitialized.getValue());
}
+TEST_F(ShardingStateTest, MetadataRefreshShouldUseDiffQuery) {
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
+ shardIdentity.setShardName(shardName());
+ shardIdentity.setClusterId(OID::gen());
+
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
+
+ const NamespaceString nss("test.user");
+ const OID initEpoch(OID::gen());
+
+ {
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 0));
+ chunk.setMax(BSON("x" << 10));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(2, 0, initEpoch));
+ setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
+ }
+
+ const ChunkVersion newVersion(3, 0, initEpoch);
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion));
+ });
+
+ onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) {
+ CollectionType coll;
+ coll.setNs(nss);
+ coll.setUpdatedAt(Date_t());
+ coll.setEpoch(initEpoch);
+ coll.setKeyPattern(BSON("x" << 1));
+ return std::vector<BSONObj>{coll.toBSON()};
+ });
+
+ onFindCommand([this, &nss, &initEpoch](const RemoteCommandRequest& request) {
+ auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
+ ASSERT_OK(diffQueryStatus.getStatus());
+
+ auto diffQuery = std::move(diffQueryStatus.getValue());
+ ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))),
+ diffQuery->getFilter());
+
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 10));
+ chunk.setMax(BSON("x" << 20));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(3, 10, initEpoch));
+ return std::vector<BSONObj>{chunk.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+/**
+ * Test where the epoch changed right before the chunk diff query.
+ */
+TEST_F(ShardingStateTest, MetadataRefreshShouldUseFullQueryOnEpochMismatch) {
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
+ shardIdentity.setShardName(shardName());
+ shardIdentity.setClusterId(OID::gen());
+
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
+
+ const NamespaceString nss("test.user");
+ const OID initEpoch(OID::gen());
+
+ {
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 0));
+ chunk.setMax(BSON("x" << 10));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(2, 0, initEpoch));
+ setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
+ }
+
+
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_OK(shardingState()->onStaleShardVersion(
+ operationContext(), nss, ChunkVersion(3, 0, initEpoch)));
+ });
+
+ onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) {
+ CollectionType coll;
+ coll.setNs(nss);
+ coll.setUpdatedAt(Date_t());
+ coll.setEpoch(initEpoch);
+ coll.setKeyPattern(BSON("x" << 1));
+ return std::vector<BSONObj>{coll.toBSON()};
+ });
+
+ // Now when the diff query is performed, it will get chunks with a different epoch.
+ const ChunkVersion newVersion(3, 0, OID::gen());
+ onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) {
+ auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
+ ASSERT_OK(diffQueryStatus.getStatus());
+
+ auto diffQuery = std::move(diffQueryStatus.getValue());
+ ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))),
+ diffQuery->getFilter());
+
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 10));
+ chunk.setMax(BSON("x" << 20));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch()));
+ return std::vector<BSONObj>{chunk.toBSON()};
+ });
+
+ // Retry the refresh again. Now doing a full reload.
+
+ onFindCommand([&nss, &newVersion](const RemoteCommandRequest&) {
+ CollectionType coll;
+ coll.setNs(nss);
+ coll.setUpdatedAt(Date_t());
+ coll.setEpoch(newVersion.epoch());
+ coll.setKeyPattern(BSON("x" << 1));
+ return std::vector<BSONObj>{coll.toBSON()};
+ });
+
+ onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) {
+ auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
+ ASSERT_OK(diffQueryStatus.getStatus());
+
+ auto diffQuery = std::move(diffQueryStatus.getValue());
+ ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))),
+ diffQuery->getFilter());
+
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 10));
+ chunk.setMax(BSON("x" << 20));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch()));
+ return std::vector<BSONObj>{chunk.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingStateTest, FullMetadataOnEpochMismatchShouldStopAfterMaxRetries) {
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
+ shardIdentity.setShardName(shardName());
+ shardIdentity.setClusterId(OID::gen());
+
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
+
+ const NamespaceString nss("test.user");
+ const OID initEpoch(OID::gen());
+
+ {
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 0));
+ chunk.setMax(BSON("x" << 10));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(2, 0, initEpoch));
+ setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
+ }
+
+
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ auto status = shardingState()->onStaleShardVersion(
+ operationContext(), nss, ChunkVersion(3, 0, initEpoch));
+ ASSERT_EQ(ErrorCodes::RemoteChangeDetected, status);
+ });
+
+ OID lastEpoch(initEpoch);
+ OID nextEpoch(OID::gen());
+ for (int tries = 0; tries < 3; tries++) {
+ onFindCommand([&nss, &lastEpoch](const RemoteCommandRequest&) {
+ CollectionType coll;
+ coll.setNs(nss);
+ coll.setUpdatedAt(Date_t());
+ coll.setEpoch(lastEpoch);
+ coll.setKeyPattern(BSON("x" << 1));
+ return std::vector<BSONObj>{coll.toBSON()};
+ });
+
+ onFindCommand([this, &nss, &nextEpoch, tries](const RemoteCommandRequest& request) {
+ auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
+ ASSERT_OK(diffQueryStatus.getStatus());
+
+ auto diffQuery = std::move(diffQueryStatus.getValue());
+ Timestamp expectedLastMod = (tries == 0) ? Timestamp(2, 0) : Timestamp(0, 0);
+ ASSERT_BSONOBJ_EQ(
+ BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << expectedLastMod)),
+ diffQuery->getFilter());
+
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 10));
+ chunk.setMax(BSON("x" << 20));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(3, 10, nextEpoch));
+ return std::vector<BSONObj>{chunk.toBSON()};
+ });
+
+ lastEpoch = nextEpoch;
+ nextEpoch = OID::gen();
+ }
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingStateTest, MetadataRefreshShouldBeOkWhenCollectionWasDropped) {
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
+ shardIdentity.setShardName(shardName());
+ shardIdentity.setClusterId(OID::gen());
+
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
+
+ const NamespaceString nss("test.user");
+ const OID initEpoch(OID::gen());
+
+ {
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 0));
+ chunk.setMax(BSON("x" << 10));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(2, 0, initEpoch));
+ setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
+ }
+
+ const ChunkVersion newVersion(3, 0, initEpoch);
+ auto future = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion));
+ });
+
+ onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) {
+ CollectionType coll;
+ coll.setNs(nss);
+ coll.setUpdatedAt(Date_t());
+ coll.setEpoch(initEpoch);
+ coll.setKeyPattern(BSON("x" << 1));
+ coll.setDropped(true);
+ return std::vector<BSONObj>{coll.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingStateTest, MetadataRefreshShouldNotRetryOtherTypesOfError) {
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
+ shardIdentity.setShardName(shardName());
+ shardIdentity.setClusterId(OID::gen());
+
+ ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
+
+ const NamespaceString nss("test.user");
+ const OID initEpoch(OID::gen());
+
+ {
+ ChunkType chunk;
+ chunk.setNS(nss.ns());
+ chunk.setMin(BSON("x" << 0));
+ chunk.setMax(BSON("x" << 10));
+ chunk.setShard(ShardId(shardName()));
+ chunk.setVersion(ChunkVersion(2, 0, initEpoch));
+ setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
+ }
+
+ auto configTargeter =
+ RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
+ configTargeter->setFindHostReturnValue({ErrorCodes::HostNotFound, "host erased by test"});
+
+ auto status = shardingState()->onStaleShardVersion(
+ operationContext(), nss, ChunkVersion(3, 0, initEpoch));
+ ASSERT_EQ(ErrorCodes::HostNotFound, status);
+}
+
} // namespace
} // namespace mongo