summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-01-19 17:20:12 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-01-19 17:20:12 -0500
commitd3e67186d1f9c633e8e69ebb7bf2418d3850688a (patch)
tree570532480170d0bc8bc757ea9f517c1ac987c18d
parent9edfc4c8ba273d54ecdc31c1fc0eb8c6a42ccbc4 (diff)
downloadmongo-d3e67186d1f9c633e8e69ebb7bf2418d3850688a.tar.gz
Revert "SERVER-26791 Shard metadata commands should perform partial refresh as much as possible"
This reverts commit 6add2c82bacecd5f54613ebf4be1553f3b046cbc.
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml1
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/sharding_state.cpp104
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp381
4 files changed, 78 insertions, 411 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index 9213045e0e6..462d8cff458 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -66,7 +66,6 @@ selector:
- jstests/sharding/listDatabases.js
- jstests/sharding/bulk_insert.js
- jstests/sharding/printShardingStatus.js
- - jstests/sharding/cmds_epoch_mismatch.js
# Balancer writes (direct write to config database with no retries)
- jstests/sharding/convert_to_and_from_sharded.js
- jstests/sharding/remove2.js
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index c82b0701de3..eac2ad60283 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -226,9 +226,6 @@ env.CppUnitTest(
'sharding_state_test.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/db/query/query_request',
- '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
],
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 258f5191b9f..ab9ca2c7dd7 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -273,24 +273,41 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn,
}
}
- auto refreshStatusAndVersion =
- _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr));
- return refreshStatusAndVersion.getStatus();
-}
+ // At the first attempt try to use the currently loaded metadata and on subsequent attempts use
+ // the complete metadata
+ int numRefreshAttempts = 0;
+
+ while (true) {
+ numRefreshAttempts++;
+
+ auto refreshStatusAndVersion =
+ _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr));
+ if (refreshStatusAndVersion.isOK()) {
+ LOG(1) << "Successfully refreshed metadata for " << nss.ns() << " to "
+ << refreshStatusAndVersion.getValue();
+ return Status::OK();
+ }
-Status ShardingState::refreshMetadataNow(OperationContext* txn,
- const NamespaceString& nss,
- ChunkVersion* latestShardVersion) {
- ScopedCollectionMetadata currentMetadata;
+ if (refreshStatusAndVersion == ErrorCodes::RemoteChangeDetected &&
+ numRefreshAttempts < kMaxNumMetadataRefreshAttempts) {
+ currentMetadata = ScopedCollectionMetadata();
- {
- AutoGetCollection autoColl(txn, nss, MODE_IS);
+ log() << "Refresh failed and will be retried as full reload "
+ << refreshStatusAndVersion.getStatus();
+ continue;
+ }
- currentMetadata = CollectionShardingState::get(txn, nss)->getMetadata();
+ return refreshStatusAndVersion.getStatus();
}
- auto refreshLatestShardVersionStatus =
- _refreshMetadata(txn, nss, currentMetadata.getMetadata());
+
+ MONGO_UNREACHABLE;
+}
+
+Status ShardingState::refreshMetadataNow(OperationContext* txn,
+ const NamespaceString& nss,
+ ChunkVersion* latestShardVersion) {
+ auto refreshLatestShardVersionStatus = _refreshMetadata(txn, nss, nullptr);
if (!refreshLatestShardVersionStatus.isOK()) {
return refreshLatestShardVersionStatus.getStatus();
}
@@ -651,34 +668,24 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata(
}
}
- Status status = {ErrorCodes::InternalError, "metadata refresh not performed"};
+ // The _configServerTickets serializes this process such that only a small number of threads can
+ // try to refresh at the same time in order to avoid overloading the config server
+ _configServerTickets.waitForTicket();
+ TicketHolderReleaser needTicketFrom(&_configServerTickets);
+
Timer t;
- int numAttempts = 0;
- std::unique_ptr<CollectionMetadata> remoteMetadata;
-
- do {
- // The _configServerTickets serializes this process such that only a small number of threads
- // can try to refresh at the same time in order to avoid overloading the config server.
- _configServerTickets.waitForTicket();
- TicketHolderReleaser needTicketFrom(&_configServerTickets);
-
- if (status == ErrorCodes::RemoteChangeDetected) {
- metadataForDiff = nullptr;
- log() << "Refresh failed and will be retried as full reload " << status;
- }
- log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: "
- << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)");
+ log() << "MetadataLoader loading chunks for " << nss.ns() << " based on: "
+ << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)");
+
+ std::unique_ptr<CollectionMetadata> remoteMetadata(stdx::make_unique<CollectionMetadata>());
- remoteMetadata = stdx::make_unique<CollectionMetadata>();
- status = MetadataLoader::makeCollectionMetadata(txn,
- grid.catalogClient(txn),
- nss.ns(),
- getShardName(),
- metadataForDiff,
- remoteMetadata.get());
- } while (status == ErrorCodes::RemoteChangeDetected &&
- ++numAttempts < kMaxNumMetadataRefreshAttempts);
+ Status status = MetadataLoader::makeCollectionMetadata(txn,
+ grid.catalogClient(txn),
+ nss.ns(),
+ getShardName(),
+ metadataForDiff,
+ remoteMetadata.get());
if (!status.isOK() && status != ErrorCodes::NamespaceNotFound) {
warning() << "MetadataLoader failed after " << t.millis() << " ms"
@@ -693,23 +700,24 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata(
auto css = CollectionShardingState::get(txn, nss);
- if (!status.isOK()) {
- invariant(status == ErrorCodes::NamespaceNotFound);
- css->refreshMetadata(txn, nullptr);
+ if (status.isOK()) {
+ css->refreshMetadata(txn, std::move(remoteMetadata));
+
+ auto metadata = css->getMetadata();
- log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace";
+ log() << "MetadataLoader took " << t.millis() << " ms and found version "
+ << metadata->getCollVersion();
- return ChunkVersion::UNSHARDED();
+ return metadata->getShardVersion();
}
- css->refreshMetadata(txn, std::move(remoteMetadata));
+ invariant(status == ErrorCodes::NamespaceNotFound);
- auto metadata = css->getMetadata();
+ css->refreshMetadata(txn, nullptr);
- log() << "MetadataLoader took " << t.millis() << " ms and found version "
- << metadata->getCollVersion();
+ log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace";
- return metadata->getShardVersion();
+ return ChunkVersion::UNSHARDED();
}
StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk(
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index e4953f3488c..0156058676d 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -33,17 +33,12 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/db/storage/storage_options.h"
-#include "mongo/s/catalog/dist_lock_manager_mock.h"
-#include "mongo/s/catalog/sharding_catalog_client_impl.h"
-#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_mongod_test_fixture.h"
@@ -59,42 +54,11 @@ public:
return &_shardingState;
}
- std::string shardName() const {
- return _shardName.toString();
- }
-
- void setupCollectionMetadata(const NamespaceString& nss,
- const OID& epoch,
- const std::vector<BSONObj>& initChunks) {
- auto future = launchAsync([this, &nss] {
- ChunkVersion latestShardVersion;
- Client::initThreadIfNotAlready();
- ASSERT_OK(
- shardingState()->refreshMetadataNow(operationContext(), nss, &latestShardVersion));
- });
-
- ChunkVersion initVersion(1, 0, epoch);
- onFindCommand([&nss, &initVersion](const RemoteCommandRequest&) {
- CollectionType coll;
- coll.setNs(nss);
- coll.setUpdatedAt(Date_t());
- coll.setEpoch(initVersion.epoch());
- coll.setKeyPattern(BSON("x" << 1));
- return std::vector<BSONObj>{coll.toBSON()};
- });
-
- onFindCommand([&initChunks](const RemoteCommandRequest&) { return initChunks; });
-
- future.timed_get(kFutureTimeout);
- }
-
protected:
// Used to write to set up local collections before exercising server logic.
std::unique_ptr<DBDirectClient> _dbDirectClient;
void setUp() override {
- _shardName = ShardId("a");
-
serverGlobalParams.clusterRole = ClusterRole::None;
ShardingMongodTestFixture::setUp();
@@ -112,7 +76,6 @@ protected:
auto configTargeter =
RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
configTargeter->setConnectionStringReturnValue(configConnStr);
- configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]);
return Status::OK();
});
@@ -133,32 +96,20 @@ protected:
ShardingMongodTestFixture::tearDown();
}
- std::unique_ptr<DistLockManager> makeDistLockManager(
- std::unique_ptr<DistLockCatalog> distLockCatalog) override {
- return stdx::make_unique<DistLockManagerMock>(nullptr);
- }
-
- std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
- std::unique_ptr<DistLockManager> distLockManager) override {
- invariant(distLockManager);
- return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
- }
-
private:
ShardingState _shardingState;
- ShardId _shardName;
};
TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ(shardName(), shardingState()->getShardName());
+ ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -166,7 +117,7 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
shardingState()->setGlobalInitMethodForTest(
@@ -201,7 +152,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -209,7 +160,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity2.setShardName(shardName());
+ shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
@@ -220,7 +171,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) {
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2));
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ(shardName(), shardingState()->getShardName());
+ ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -229,7 +180,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -237,7 +188,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "b:2,c:3", "config"));
- shardIdentity2.setShardName(shardName());
+ shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
@@ -248,7 +199,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) {
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2));
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ(shardName(), shardingState()->getShardName());
+ ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -257,7 +208,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -265,7 +216,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS"));
- shardIdentity2.setShardName(shardName());
+ shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(clusterID);
shardingState()->setGlobalInitMethodForTest(
@@ -277,7 +228,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) {
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ(shardName(), shardingState()->getShardName());
+ ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -286,7 +237,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(clusterID);
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -306,7 +257,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ(shardName(), shardingState()->getShardName());
+ ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -314,7 +265,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
@@ -322,7 +273,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ShardIdentityType shardIdentity2;
shardIdentity2.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity2.setShardName(shardName());
+ shardIdentity2.setShardName("a");
shardIdentity2.setClusterId(OID::gen());
shardingState()->setGlobalInitMethodForTest(
@@ -334,7 +285,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status);
ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ(shardName(), shardingState()->getShardName());
+ ASSERT_EQ("a", shardingState()->getShardName());
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString());
}
@@ -376,7 +327,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
@@ -424,7 +375,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
@@ -464,7 +415,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON();
@@ -534,7 +485,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
BSONObj validShardIdentity = shardIdentity.toBSON();
@@ -594,7 +545,7 @@ TEST_F(ShardingStateTest,
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
+ shardIdentity.setShardName("a");
shardIdentity.setClusterId(OID::gen());
ASSERT_OK(shardIdentity.validate());
BSONObj validShardIdentity = shardIdentity.toBSON();
@@ -609,293 +560,5 @@ TEST_F(ShardingStateTest,
ASSERT_FALSE(swShardingInitialized.getValue());
}
-TEST_F(ShardingStateTest, MetadataRefreshShouldUseDiffQuery) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
- shardIdentity.setClusterId(OID::gen());
-
- ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
-
- const NamespaceString nss("test.user");
- const OID initEpoch(OID::gen());
-
- {
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 0));
- chunk.setMax(BSON("x" << 10));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(2, 0, initEpoch));
- setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
- }
-
- const ChunkVersion newVersion(3, 0, initEpoch);
- auto future = launchAsync([&] {
- Client::initThreadIfNotAlready();
- ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion));
- });
-
- onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) {
- CollectionType coll;
- coll.setNs(nss);
- coll.setUpdatedAt(Date_t());
- coll.setEpoch(initEpoch);
- coll.setKeyPattern(BSON("x" << 1));
- return std::vector<BSONObj>{coll.toBSON()};
- });
-
- onFindCommand([this, &nss, &initEpoch](const RemoteCommandRequest& request) {
- auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
- ASSERT_OK(diffQueryStatus.getStatus());
-
- auto diffQuery = std::move(diffQueryStatus.getValue());
- ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))),
- diffQuery->getFilter());
-
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 10));
- chunk.setMax(BSON("x" << 20));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(3, 10, initEpoch));
- return std::vector<BSONObj>{chunk.toBSON()};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-/**
- * Test where the epoch changed right before the chunk diff query.
- */
-TEST_F(ShardingStateTest, MetadataRefreshShouldUseFullQueryOnEpochMismatch) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
- shardIdentity.setClusterId(OID::gen());
-
- ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
-
- const NamespaceString nss("test.user");
- const OID initEpoch(OID::gen());
-
- {
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 0));
- chunk.setMax(BSON("x" << 10));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(2, 0, initEpoch));
- setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
- }
-
-
- auto future = launchAsync([&] {
- Client::initThreadIfNotAlready();
- ASSERT_OK(shardingState()->onStaleShardVersion(
- operationContext(), nss, ChunkVersion(3, 0, initEpoch)));
- });
-
- onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) {
- CollectionType coll;
- coll.setNs(nss);
- coll.setUpdatedAt(Date_t());
- coll.setEpoch(initEpoch);
- coll.setKeyPattern(BSON("x" << 1));
- return std::vector<BSONObj>{coll.toBSON()};
- });
-
- // Now when the diff query is performed, it will get chunks with a different epoch.
- const ChunkVersion newVersion(3, 0, OID::gen());
- onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) {
- auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
- ASSERT_OK(diffQueryStatus.getStatus());
-
- auto diffQuery = std::move(diffQueryStatus.getValue());
- ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(2, 0))),
- diffQuery->getFilter());
-
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 10));
- chunk.setMax(BSON("x" << 20));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch()));
- return std::vector<BSONObj>{chunk.toBSON()};
- });
-
- // Retry the refresh again. Now doing a full reload.
-
- onFindCommand([&nss, &newVersion](const RemoteCommandRequest&) {
- CollectionType coll;
- coll.setNs(nss);
- coll.setUpdatedAt(Date_t());
- coll.setEpoch(newVersion.epoch());
- coll.setKeyPattern(BSON("x" << 1));
- return std::vector<BSONObj>{coll.toBSON()};
- });
-
- onFindCommand([this, &nss, &newVersion](const RemoteCommandRequest& request) {
- auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
- ASSERT_OK(diffQueryStatus.getStatus());
-
- auto diffQuery = std::move(diffQueryStatus.getValue());
- ASSERT_BSONOBJ_EQ(BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))),
- diffQuery->getFilter());
-
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 10));
- chunk.setMax(BSON("x" << 20));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(3, 10, newVersion.epoch()));
- return std::vector<BSONObj>{chunk.toBSON()};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingStateTest, FullMetadataOnEpochMismatchShouldStopAfterMaxRetries) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
- shardIdentity.setClusterId(OID::gen());
-
- ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
-
- const NamespaceString nss("test.user");
- const OID initEpoch(OID::gen());
-
- {
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 0));
- chunk.setMax(BSON("x" << 10));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(2, 0, initEpoch));
- setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
- }
-
-
- auto future = launchAsync([&] {
- Client::initThreadIfNotAlready();
- auto status = shardingState()->onStaleShardVersion(
- operationContext(), nss, ChunkVersion(3, 0, initEpoch));
- ASSERT_EQ(ErrorCodes::RemoteChangeDetected, status);
- });
-
- OID lastEpoch(initEpoch);
- OID nextEpoch(OID::gen());
- for (int tries = 0; tries < 3; tries++) {
- onFindCommand([&nss, &lastEpoch](const RemoteCommandRequest&) {
- CollectionType coll;
- coll.setNs(nss);
- coll.setUpdatedAt(Date_t());
- coll.setEpoch(lastEpoch);
- coll.setKeyPattern(BSON("x" << 1));
- return std::vector<BSONObj>{coll.toBSON()};
- });
-
- onFindCommand([this, &nss, &nextEpoch, tries](const RemoteCommandRequest& request) {
- auto diffQueryStatus = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
- ASSERT_OK(diffQueryStatus.getStatus());
-
- auto diffQuery = std::move(diffQueryStatus.getValue());
- Timestamp expectedLastMod = (tries == 0) ? Timestamp(2, 0) : Timestamp(0, 0);
- ASSERT_BSONOBJ_EQ(
- BSON("ns" << nss.ns() << "lastmod" << BSON("$gte" << expectedLastMod)),
- diffQuery->getFilter());
-
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 10));
- chunk.setMax(BSON("x" << 20));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(3, 10, nextEpoch));
- return std::vector<BSONObj>{chunk.toBSON()};
- });
-
- lastEpoch = nextEpoch;
- nextEpoch = OID::gen();
- }
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingStateTest, MetadataRefreshShouldBeOkWhenCollectionWasDropped) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
- shardIdentity.setClusterId(OID::gen());
-
- ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
-
- const NamespaceString nss("test.user");
- const OID initEpoch(OID::gen());
-
- {
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 0));
- chunk.setMax(BSON("x" << 10));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(2, 0, initEpoch));
- setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
- }
-
- const ChunkVersion newVersion(3, 0, initEpoch);
- auto future = launchAsync([&] {
- Client::initThreadIfNotAlready();
- ASSERT_OK(shardingState()->onStaleShardVersion(operationContext(), nss, newVersion));
- });
-
- onFindCommand([&nss, &initEpoch](const RemoteCommandRequest&) {
- CollectionType coll;
- coll.setNs(nss);
- coll.setUpdatedAt(Date_t());
- coll.setEpoch(initEpoch);
- coll.setKeyPattern(BSON("x" << 1));
- coll.setDropped(true);
- return std::vector<BSONObj>{coll.toBSON()};
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(ShardingStateTest, MetadataRefreshShouldNotRetryOtherTypesOfError) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName(shardName());
- shardIdentity.setClusterId(OID::gen());
-
- ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity));
-
- const NamespaceString nss("test.user");
- const OID initEpoch(OID::gen());
-
- {
- ChunkType chunk;
- chunk.setNS(nss.ns());
- chunk.setMin(BSON("x" << 0));
- chunk.setMax(BSON("x" << 10));
- chunk.setShard(ShardId(shardName()));
- chunk.setVersion(ChunkVersion(2, 0, initEpoch));
- setupCollectionMetadata(nss, initEpoch, std::vector<BSONObj>{chunk.toBSON()});
- }
-
- auto configTargeter =
- RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
- configTargeter->setFindHostReturnValue({ErrorCodes::HostNotFound, "host erased by test"});
-
- auto status = shardingState()->onStaleShardVersion(
- operationContext(), nss, ChunkVersion(3, 0, initEpoch));
- ASSERT_EQ(ErrorCodes::HostNotFound, status);
-}
-
} // namespace
} // namespace mongo