summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2020-09-29 19:42:59 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-30 00:37:23 +0000
commit253740ba90154d8f9330553f142e177db47f4910 (patch)
treede65d3ca3d795e0dffc65edda2d1499e14b254a8
parenteb2b72cf9c0269f086223d499ac9be8a270d268c (diff)
downloadmongo-253740ba90154d8f9330553f142e177db47f4910.tar.gz
SERVER-49822 Add destined recipient to oplog entries from inserts
-rw-r--r--src/mongo/db/op_observer_impl.cpp1
-rw-r--r--src/mongo/db/repl/oplog.cpp1
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/resharding_destined_recipient_test.cpp333
-rw-r--r--src/mongo/db/s/resharding_util.cpp11
-rw-r--r--src/mongo/db/s/resharding_util.h2
-rw-r--r--src/mongo/s/catalog_cache_loader_mock.cpp50
-rw-r--r--src/mongo/s/catalog_cache_loader_mock.h21
8 files changed, 399 insertions, 23 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index bb187fc2149..0aa06762031 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -454,6 +454,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
for (auto iter = first; iter != last; iter++) {
auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid.get(), iter->doc);
+ shardAnnotateOplogEntry(opCtx, nss, iter->doc, operation);
txnParticipant.addTransactionOperation(opCtx, operation);
}
} else {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 23c148b7f75..c79267245db 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -393,6 +393,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
}
oplogEntry.setObject(begin[i].doc);
oplogEntry.setOpTime(insertStatementOplogSlot);
+ oplogEntry.setDestinedRecipient(getDestinedRecipient(opCtx, nss, begin[i].doc));
OplogLink oplogLink;
if (i > 0)
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 1f005757352..138ca82d3f2 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -433,6 +433,7 @@ env.CppUnitTest(
'migration_session_id_test.cpp',
'migration_util_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
+ 'resharding_destined_recipient_test.cpp',
'session_catalog_migration_destination_test.cpp',
'session_catalog_migration_source_test.cpp',
'shard_local_test.cpp',
@@ -453,10 +454,12 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/query/query_request',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
+ '$BUILD_DIR/mongo/db/repl/oplog_interface_local',
'$BUILD_DIR/mongo/db/repl/storage_interface_impl',
'$BUILD_DIR/mongo/db/repl/wait_for_majority_service',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
+ 'resharding_util',
'shard_server_test_fixture',
'sharding_logging',
'sharding_runtime_d',
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp
new file mode 100644
index 00000000000..86ce0a174da
--- /dev/null
+++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp
@@ -0,0 +1,333 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/repl/oplog_interface_local.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/shard_filtering_metadata_refresh.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog_cache_loader_mock.h"
+#include "mongo/s/database_version_helpers.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class DestinedRecipientTest : public ShardServerTestFixture {
+public:
+ const NamespaceString kNss{"test.foo"};
+ const std::string kShardKey = "x";
+ const HostAndPort kConfigHostAndPort{"DummyConfig", 12345};
+ const std::vector<ShardType> kShardList = {ShardType("shard0", "Host0:12345"),
+ ShardType("shard1", "Host1:12345")};
+
+ void setUp() override {
+ // Don't call ShardServerTestFixture::setUp so we can install a mock catalog cache loader.
+ ShardingMongodTestFixture::setUp();
+
+ replicationCoordinator()->alwaysAllowWrites(true);
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
+ _clusterId = OID::gen();
+ ShardingState::get(getServiceContext())
+ ->setInitialized(kShardList[0].getName(), _clusterId);
+
+ auto mockLoader = std::make_unique<CatalogCacheLoaderMock>();
+ _mockCatalogCacheLoader = mockLoader.get();
+ CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader));
+
+ uassertStatusOK(
+ initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort)));
+
+ configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort);
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+
+ for (const auto& shard : kShardList) {
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ std::make_unique<RemoteCommandTargeterMock>());
+ HostAndPort host(shard.getHost());
+ targeter->setConnectionStringReturnValue(ConnectionString(host));
+ targeter->setFindHostReturnValue(host);
+ targeterFactory()->addTargeterToReturn(ConnectionString(host), std::move(targeter));
+ }
+ }
+
+ void tearDown() override {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+
+ ShardServerTestFixture::tearDown();
+ }
+
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient(std::vector<ShardType> shards)
+ : ShardingCatalogClientMock(nullptr), _shards(std::move(shards)) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+ return repl::OpTimeWith<std::vector<ShardType>>(_shards);
+ }
+
+ StatusWith<std::vector<CollectionType>> getCollections(
+ OperationContext* opCtx,
+ const std::string* dbName,
+ repl::OpTime* optime,
+ repl::ReadConcernLevel readConcernLevel) override {
+ return _colls;
+ }
+
+ void setCollections(std::vector<CollectionType> colls) {
+ _colls = std::move(colls);
+ }
+
+ private:
+ const std::vector<ShardType> _shards;
+ std::vector<CollectionType> _colls;
+ };
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ return std::make_unique<StaticCatalogClient>(kShardList);
+ }
+
+protected:
+ CollectionType createCollection(const OID& epoch) {
+ CollectionType coll;
+
+ coll.setNs(kNss);
+ coll.setEpoch(epoch);
+ coll.setKeyPattern(BSON(kShardKey << 1));
+ coll.setUnique(false);
+ coll.setUUID(UUID::gen());
+
+ return coll;
+ }
+
+ std::vector<ChunkType> createChunks(const OID& epoch, const std::string& shardKey) {
+ auto range1 = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << 5));
+ ChunkType chunk1(kNss, range1, ChunkVersion(1, 0, epoch), kShardList[0].getName());
+
+ auto range2 = ChunkRange(BSON(shardKey << 5), BSON(shardKey << MAXKEY));
+ ChunkType chunk2(kNss, range2, ChunkVersion(1, 0, epoch), kShardList[1].getName());
+
+ return {chunk1, chunk2};
+ }
+
+ struct ReshardingEnv {
+ ReshardingEnv(UUID uuid) : sourceUuid(std::move(uuid)) {}
+
+ NamespaceString tempNss;
+ UUID sourceUuid;
+ ShardId destShard;
+ ChunkVersion version;
+ DatabaseVersion dbVersion;
+ };
+
+ ReshardingEnv setupReshardingEnv(OperationContext* opCtx, bool refreshTempNss) {
+ DBDirectClient client(opCtx);
+ client.createCollection(kNss.ns());
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+
+ ReshardingEnv env(CollectionCatalog::get(opCtx).lookupUUIDByNSS(opCtx, kNss).value());
+ env.destShard = kShardList[1].getName();
+ env.version = ChunkVersion(1, 0, OID::gen());
+ env.dbVersion = databaseVersion::makeNew();
+
+ env.tempNss =
+ NamespaceString(kNss.db(),
+ fmt::format("{}{}",
+ NamespaceString::kTemporaryReshardingCollectionPrefix,
+ env.sourceUuid.toString()));
+
+ client.createCollection(env.tempNss.ns());
+
+
+ DatabaseType db(kNss.db().toString(), kShardList[0].getName(), true, env.dbVersion);
+
+ TypeCollectionReshardingFields reshardingFields;
+ reshardingFields.setUuid(UUID::gen());
+ reshardingFields.setDonorFields(TypeCollectionDonorFields{BSON("y" << 1)});
+
+ auto collType = createCollection(env.version.epoch());
+
+ _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(db);
+ _mockCatalogCacheLoader->setCollectionRefreshValues(
+ kNss, collType, createChunks(env.version.epoch(), kShardKey), reshardingFields);
+ _mockCatalogCacheLoader->setCollectionRefreshValues(
+ env.tempNss, collType, createChunks(env.version.epoch(), "y"), boost::none);
+
+ forceShardFilteringMetadataRefresh(opCtx, kNss);
+
+ if (refreshTempNss)
+ forceShardFilteringMetadataRefresh(opCtx, env.tempNss);
+
+ return env;
+ }
+
+ void writeDoc(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& doc,
+ const ReshardingEnv& env) {
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection autoColl1(opCtx, nss, MODE_IX);
+
+ // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter
+ // has been updated to detect frozen migrations.
+ if (!OperationShardingState::isOperationVersioned(opCtx)) {
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ nss, env.version, env.dbVersion);
+ }
+
+ auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss);
+ ASSERT(collection);
+ auto status = collection->insertDocument(opCtx, InsertStatement(doc), nullptr);
+ ASSERT_OK(status);
+
+ wuow.commit();
+ }
+
+protected:
+ CatalogCacheLoaderMock* _mockCatalogCacheLoader;
+};
+
+TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) {
+ auto opCtx = operationContext();
+ auto env = setupReshardingEnv(opCtx, true);
+
+ AutoGetCollection coll(opCtx, kNss, MODE_IX);
+
+ // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter has
+ // been updated to detect frozen migrations.
+ if (!OperationShardingState::isOperationVersioned(opCtx)) {
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ kNss, env.version, env.dbVersion);
+ }
+
+ auto destShardId = getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10));
+ ASSERT(destShardId);
+ ASSERT_EQ(*destShardId, env.destShard);
+}
+
+TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) {
+ auto opCtx = operationContext();
+ auto env = setupReshardingEnv(opCtx, false);
+
+ AutoGetCollection coll(opCtx, kNss, MODE_IX);
+
+ // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter has
+ // been updated to detect frozen migrations.
+ if (!OperationShardingState::isOperationVersioned(opCtx)) {
+ OperationShardingState::get(opCtx).initializeClientRoutingVersions(
+ kNss, env.version, env.dbVersion);
+ }
+
+ ASSERT_THROWS(getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10)),
+ ExceptionFor<ErrorCodes::ShardInvalidatedForTargeting>);
+}
+
+TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInserts) {
+ auto opCtx = operationContext();
+ auto env = setupReshardingEnv(opCtx, true);
+
+ writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env);
+
+ repl::OplogInterfaceLocal oplogInterface(opCtx);
+ auto oplogIter = oplogInterface.makeIterator();
+
+ auto nextValue = unittest::assertGet(oplogIter->next());
+ const auto& doc = nextValue.first;
+ auto entry = unittest::assertGet(repl::OplogEntry::parse(doc));
+ auto recipShard = entry.getDestinedRecipient();
+
+ ASSERT(recipShard);
+ ASSERT_EQ(*recipShard, env.destShard);
+}
+
+TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInsertsInTransaction) {
+ auto opCtx = operationContext();
+ auto env = setupReshardingEnv(opCtx, true);
+
+ auto sessionId = makeLogicalSessionIdForTest();
+ const TxnNumber txnNum = 0;
+
+ {
+ opCtx->setLogicalSessionId(sessionId);
+ opCtx->setTxnNumber(txnNum);
+ opCtx->setInMultiDocumentTransaction();
+
+ MongoDOperationContextSession ocs(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ ASSERT(txnParticipant);
+ txnParticipant.beginOrContinue(opCtx, txnNum, false, true);
+ txnParticipant.unstashTransactionResources(opCtx, "SetDestinedRecipient");
+
+ writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env);
+
+ txnParticipant.commitUnpreparedTransaction(opCtx);
+ txnParticipant.stashTransactionResources(opCtx);
+ }
+
+ // Look for destined recipient in latest oplog entry. Since this write was done in a
+ // transaction, the write operation will be embedded in an applyOps entry and needs to be
+ // extracted.
+ repl::OplogInterfaceLocal oplogInterface(opCtx);
+ auto oplogIter = oplogInterface.makeIterator();
+
+ const auto& doc = unittest::assertGet(oplogIter->next()).first;
+ auto entry = unittest::assertGet(repl::OplogEntry::parse(doc));
+ auto info = repl::ApplyOpsCommandInfo::parse(entry.getOperationToApply());
+
+ auto ops = info.getOperations();
+ auto replOp = repl::ReplOperation::parse(IDLParserErrorContext("insertOp"), ops[0]);
+ ASSERT_EQ(replOp.getNss(), kNss);
+
+ auto recipShard = replOp.getDestinedRecipient();
+ ASSERT(recipShard);
+ ASSERT_EQ(*recipShard, env.destShard);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index ccfc3caead7..339f4f456d2 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -55,6 +55,7 @@
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h"
+#include "mongo/s/shard_invalidated_for_targeting_exception.h"
#include "mongo/s/shard_key_pattern.h"
namespace mongo {
@@ -82,7 +83,9 @@ NamespaceString getTempReshardingNss(StringData db, const UUID& sourceUuid) {
// Ensure that this shard owns the document. This must be called after verifying that we
// are in a resharding operation so that we are guaranteed that migrations are suspended.
-bool documentBelongsToMe(OperationContext* opCtx, CollectionShardingState* css, BSONObj doc) {
+bool documentBelongsToMe(OperationContext* opCtx,
+ CollectionShardingState* css,
+ const BSONObj& doc) {
auto currentKeyPattern = ShardKeyPattern(css->getCollectionDescription(opCtx).getKeyPattern());
auto ownershipFilter = css->getOwnershipFilter(
opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup);
@@ -92,7 +95,7 @@ bool documentBelongsToMe(OperationContext* opCtx, CollectionShardingState* css,
boost::optional<TypeCollectionDonorFields> getDonorFields(OperationContext* opCtx,
const NamespaceString& sourceNss,
- BSONObj fullDocument) {
+ const BSONObj& fullDocument) {
auto css = CollectionShardingState::get(opCtx, sourceNss);
auto collDesc = css->getCollectionDescription(opCtx);
@@ -680,7 +683,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> createAggForCollectionCloning(
boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
const NamespaceString& sourceNss,
- BSONObj fullDocument) {
+ const BSONObj& fullDocument) {
auto donorFields = getDonorFields(opCtx, sourceNss, fullDocument);
if (!donorFields)
return boost::none;
@@ -691,7 +694,7 @@ boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
getTempReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)),
allowLocks);
- uassert(ErrorCodes::ShardInvalidatedForTargeting,
+ uassert(ShardInvalidatedForTargetingInfo(sourceNss),
"Routing information is not available for the temporary resharding collection.",
tempNssRoutingInfo.getStatus() != ErrorCodes::StaleShardVersion);
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 21e4d192682..247dfba84a9 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -179,7 +179,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
*/
boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
const NamespaceString& sourceNss,
- BSONObj fullDocument);
+ const BSONObj& fullDocument);
/**
* Creates pipeline for filtering collection data matching the recipient shard.
*/
diff --git a/src/mongo/s/catalog_cache_loader_mock.cpp b/src/mongo/s/catalog_cache_loader_mock.cpp
index 4fb761e5d4c..1bfe4d9c7d8 100644
--- a/src/mongo/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/s/catalog_cache_loader_mock.cpp
@@ -76,27 +76,41 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin
MONGO_UNREACHABLE;
}
+CollectionAndChangedChunks getCollectionRefresh(
+ const StatusWith<CollectionType>& swCollectionReturnValue,
+ StatusWith<std::vector<ChunkType>> swChunksReturnValue,
+ const boost::optional<TypeCollectionReshardingFields>& reshardingFields) {
+ uassertStatusOK(swCollectionReturnValue);
+ uassertStatusOK(swChunksReturnValue);
+
+ // We swap the chunks out of _swChunksReturnValue to ensure if this task is
+ // scheduled multiple times that we don't inform the ChunkManager about a chunk it
+ // has already updated.
+ std::vector<ChunkType> chunks;
+ swChunksReturnValue.getValue().swap(chunks);
+
+ return CollectionAndChangedChunks(swCollectionReturnValue.getValue().getUUID(),
+ swCollectionReturnValue.getValue().getEpoch(),
+ swCollectionReturnValue.getValue().getKeyPattern().toBSON(),
+ swCollectionReturnValue.getValue().getDefaultCollation(),
+ swCollectionReturnValue.getValue().getUnique(),
+ reshardingFields,
+ std::move(chunks));
+}
+
SemiFuture<CollectionAndChangedChunks> CatalogCacheLoaderMock::getChunksSince(
const NamespaceString& nss, ChunkVersion version) {
- return makeReadyFutureWith([this] {
- uassertStatusOK(_swCollectionReturnValue);
- uassertStatusOK(_swChunksReturnValue);
-
- // We swap the chunks out of _swChunksReturnValue to ensure if this task is
- // scheduled multiple times that we don't inform the ChunkManager about a chunk it
- // has already updated.
- std::vector<ChunkType> chunks;
- _swChunksReturnValue.getValue().swap(chunks);
-
- return CollectionAndChangedChunks(
- _swCollectionReturnValue.getValue().getUUID(),
- _swCollectionReturnValue.getValue().getEpoch(),
- _swCollectionReturnValue.getValue().getKeyPattern().toBSON(),
- _swCollectionReturnValue.getValue().getDefaultCollation(),
- _swCollectionReturnValue.getValue().getUnique(),
- boost::none,
- std::move(chunks));
+ return makeReadyFutureWith([&nss, this] {
+ auto it = _refreshValues.find(nss);
+
+ if (it != _refreshValues.end())
+ return getCollectionRefresh(it->second.swCollectionReturnValue,
+ std::move(it->second.swChunksReturnValue),
+ it->second.reshardingFields);
+
+ return getCollectionRefresh(
+ _swCollectionReturnValue, std::move(_swChunksReturnValue), _reshardingFields);
})
.semi();
}
diff --git a/src/mongo/s/catalog_cache_loader_mock.h b/src/mongo/s/catalog_cache_loader_mock.h
index 9aa92f2a893..13538c4f69c 100644
--- a/src/mongo/s/catalog_cache_loader_mock.h
+++ b/src/mongo/s/catalog_cache_loader_mock.h
@@ -83,6 +83,18 @@ public:
void setDatabaseRefreshReturnValue(StatusWith<DatabaseType> swDatabase);
void clearDatabaseReturnValue();
+ void setReshardingFields(boost::optional<TypeCollectionReshardingFields> reshardingFields) {
+ _reshardingFields = std::move(reshardingFields);
+ }
+
+ void setCollectionRefreshValues(
+ const NamespaceString& nss,
+ StatusWith<CollectionType> statusWithCollection,
+ StatusWith<std::vector<ChunkType>> statusWithChunks,
+ boost::optional<TypeCollectionReshardingFields> reshardingFields) {
+ _refreshValues[nss] = {statusWithCollection, statusWithChunks, reshardingFields};
+ }
+
static const Status kCollectionInternalErrorStatus;
static const Status kChunksInternalErrorStatus;
static const Status kDatabaseInternalErrorStatus;
@@ -92,6 +104,15 @@ private:
StatusWith<std::vector<ChunkType>> _swChunksReturnValue{kChunksInternalErrorStatus};
+ boost::optional<TypeCollectionReshardingFields> _reshardingFields;
+
+ struct RefreshInfo {
+ StatusWith<CollectionType> swCollectionReturnValue{kCollectionInternalErrorStatus};
+ StatusWith<std::vector<ChunkType>> swChunksReturnValue{kChunksInternalErrorStatus};
+ boost::optional<TypeCollectionReshardingFields> reshardingFields;
+ };
+
+ stdx::unordered_map<NamespaceString, RefreshInfo> _refreshValues;
StatusWith<DatabaseType> _swDatabaseReturnValue{kDatabaseInternalErrorStatus};
};