summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-11-29 10:21:19 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-12-04 13:24:55 -0500
commit95e95613412c33b52bb8a514751550c2447526d4 (patch)
treeb0f86dd04b6c6f8978b6a77f6d29adffab79eddf /src/mongo/db/s
parent7920e242c0def907b502265ca14ddf3d86c98025 (diff)
downloadmongo-95e95613412c33b52bb8a514751550c2447526d4.tar.gz
SERVER-31056 Remove usages of ScopedCollectionMetadata default constructor
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/collection_metadata.cpp36
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp3
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp5
-rw-r--r--src/mongo/db/s/get_shard_version_command.cpp15
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp69
-rw-r--r--src/mongo/db/s/metadata_manager.cpp15
-rw-r--r--src/mongo/db/s/metadata_manager_test.cpp2
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp7
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp80
-rw-r--r--src/mongo/db/s/migration_source_manager.h19
-rw-r--r--src/mongo/db/s/sharding_state.cpp29
-rw-r--r--src/mongo/db/s/split_chunk.cpp11
12 files changed, 135 insertions, 156 deletions
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index c9edf1f8194..964370cc4e8 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -45,8 +45,8 @@ CollectionMetadata::CollectionMetadata(std::shared_ptr<ChunkManager> cm, const S
: _cm(std::move(cm)),
_thisShardId(thisShardId),
_shardVersion(_cm->getVersion(_thisShardId)),
- _chunksMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()),
- _rangesMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) {
+ _chunksMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<BSONObj>()),
+ _rangesMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<BSONObj>()) {
invariant(_cm->getVersion().isSet());
invariant(_cm->getVersion() >= _shardVersion);
@@ -55,9 +55,7 @@ CollectionMetadata::CollectionMetadata(std::shared_ptr<ChunkManager> cm, const S
if (chunk->getShardId() != _thisShardId)
continue;
- _chunksMap.emplace_hint(_chunksMap.end(),
- chunk->getMin(),
- CachedChunkInfo(chunk->getMax(), chunk->getLastmod()));
+ _chunksMap.emplace_hint(_chunksMap.end(), chunk->getMin(), chunk->getMax());
}
if (_chunksMap.empty()) {
@@ -81,7 +79,7 @@ void CollectionMetadata::_buildRangesMap() {
for (const auto& entry : _chunksMap) {
BSONObj const& currMin = entry.first;
- BSONObj const& currMax = entry.second.getMaxKey();
+ BSONObj const& currMax = entry.second;
// Coalesce the chunk's bounds in ranges if they are adjacent chunks
if (min.isEmpty()) {
@@ -95,8 +93,7 @@ void CollectionMetadata::_buildRangesMap() {
continue;
}
- _rangesMap.emplace_hint(
- _rangesMap.end(), min, CachedChunkInfo(max, ChunkVersion::IGNORED()));
+ _rangesMap.emplace_hint(_rangesMap.end(), min, max);
min = currMin;
max = currMax;
@@ -105,7 +102,7 @@ void CollectionMetadata::_buildRangesMap() {
invariant(!min.isEmpty());
invariant(!max.isEmpty());
- _rangesMap.emplace(min, CachedChunkInfo(max, ChunkVersion::IGNORED()));
+ _rangesMap.emplace(min, max);
}
bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const {
@@ -117,7 +114,7 @@ bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const {
if (it != _rangesMap.begin())
it--;
- return rangeContains(it->first, it->second.getMaxKey(), key);
+ return rangeContains(it->first, it->second, key);
}
bool CollectionMetadata::getNextChunk(const BSONObj& lookupKey, ChunkType* chunk) const {
@@ -130,16 +127,15 @@ bool CollectionMetadata::getNextChunk(const BSONObj& lookupKey, ChunkType* chunk
lowerChunkIt = _chunksMap.end();
}
- if (lowerChunkIt != _chunksMap.end() &&
- lowerChunkIt->second.getMaxKey().woCompare(lookupKey) > 0) {
+ if (lowerChunkIt != _chunksMap.end() && lowerChunkIt->second.woCompare(lookupKey) > 0) {
chunk->setMin(lowerChunkIt->first);
- chunk->setMax(lowerChunkIt->second.getMaxKey());
+ chunk->setMax(lowerChunkIt->second);
return true;
}
if (upperChunkIt != _chunksMap.end()) {
chunk->setMin(upperChunkIt->first);
- chunk->setMax(upperChunkIt->second.getMaxKey());
+ chunk->setMax(upperChunkIt->second);
return true;
}
@@ -154,7 +150,7 @@ bool CollectionMetadata::getDifferentChunk(const BSONObj& chunkMinKey,
while (lowerChunkIt != upperChunkIt) {
if (lowerChunkIt->first.woCompare(chunkMinKey) != 0) {
differentChunk->setMin(lowerChunkIt->first);
- differentChunk->setMax(lowerChunkIt->second.getMaxKey());
+ differentChunk->setMax(lowerChunkIt->second);
return true;
}
++lowerChunkIt;
@@ -202,7 +198,7 @@ void CollectionMetadata::toBSONChunks(BSONArrayBuilder& bb) const {
for (RangeMap::const_iterator it = _chunksMap.begin(); it != _chunksMap.end(); ++it) {
BSONArrayBuilder chunkBB(bb.subarrayStart());
chunkBB.append(it->first);
- chunkBB.append(it->second.getMaxKey());
+ chunkBB.append(it->second);
chunkBB.done();
}
}
@@ -235,8 +231,8 @@ boost::optional<KeyRange> CollectionMetadata::getNextOrphanRange(
// If we overlap, continue after the overlap
// TODO: Could optimize slightly by finding next non-contiguous chunk
- if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(lookupKey) > 0) {
- lookupKey = lowerIt->second.getMaxKey(); // note side effect
+ if (lowerIt != map.end() && lowerIt->second.woCompare(lookupKey) > 0) {
+ lookupKey = lowerIt->second; // note side effect
return boost::none;
} else {
return Its(lowerIt, upperIt);
@@ -258,8 +254,8 @@ boost::optional<KeyRange> CollectionMetadata::getNextOrphanRange(
// bounds of the surrounding ranges in both maps.
auto lowerIt = its.first, upperIt = its.second;
- if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(range->minKey) > 0) {
- range->minKey = lowerIt->second.getMaxKey();
+ if (lowerIt != map.end() && lowerIt->second.woCompare(range->minKey) > 0) {
+ range->minKey = lowerIt->second;
}
if (upperIt != map.end() && upperIt->first.woCompare(range->maxKey) < 0) {
range->maxKey = upperIt->first;
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index 20bf7c2b40e..14229bc3965 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -83,8 +83,7 @@ protected:
};
struct stRangeMap : public RangeMap {
- stRangeMap()
- : RangeMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) {}
+ stRangeMap() : RangeMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<BSONObj>()) {}
};
TEST_F(NoChunkFixture, BasicBelongsToMe) {
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 93b54051160..3ead8021941 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -213,10 +213,7 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx)
ChunkVersion wanted;
if (!_checkShardVersionOk(opCtx, &errmsg, &received, &wanted)) {
throw StaleConfigException(
- _nss.ns(),
- str::stream() << "[" << _nss.ns() << "] shard version not ok: " << errmsg,
- received,
- wanted);
+ _nss.ns(), str::stream() << "shard version not ok: " << errmsg, received, wanted);
}
}
diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp
index f400d9f31bc..91c7707eae6 100644
--- a/src/mongo/db/s/get_shard_version_command.cpp
+++ b/src/mongo/db/s/get_shard_version_command.cpp
@@ -87,13 +87,10 @@ public:
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
const NamespaceString nss(parseNs(dbname, cmdObj));
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << nss.ns() << " is not a valid namespace",
- nss.isValid());
- ShardingState* const gss = ShardingState::get(opCtx);
- if (gss->enabled()) {
- result.append("configServer", gss->getConfigServer(opCtx).toString());
+ ShardingState* const shardingState = ShardingState::get(opCtx);
+ if (shardingState->enabled()) {
+ result.append("configServer", shardingState->getConfigServer(opCtx).toString());
} else {
result.append("configServer", "");
}
@@ -109,11 +106,7 @@ public:
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
CollectionShardingState* const css = CollectionShardingState::get(opCtx, nss);
- ScopedCollectionMetadata metadata;
- if (css) {
- metadata = css->getMetadata();
- }
-
+ const auto metadata = css->getMetadata();
if (metadata) {
result.appendTimestamp("global", metadata->getShardVersion().toLong());
} else {
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index c6281ce9bdb..f86073a55da 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -57,17 +57,18 @@ using std::vector;
namespace {
-bool _checkMetadataForSuccess(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& minKey,
- const BSONObj& maxKey) {
- ScopedCollectionMetadata metadataAfterMerge;
- {
+bool checkMetadataForSuccess(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey) {
+ const auto metadataAfterMerge = [&] {
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ return CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
+ }();
- // Get collection metadata
- metadataAfterMerge = CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
- }
+ uassert(ErrorCodes::StaleConfig,
+ str::stream() << "Collection " << nss.ns() << " became unsharded",
+ metadataAfterMerge);
ChunkType chunk;
if (!metadataAfterMerge->getNextChunk(minKey, &chunk)) {
@@ -100,15 +101,14 @@ Status mergeChunks(OperationContext* opCtx,
return Status(scopedDistLock.getStatus().code(), errmsg);
}
- ShardingState* shardingState = ShardingState::get(opCtx);
+ auto const shardingState = ShardingState::get(opCtx);
//
// We now have the collection lock, refresh metadata to latest version and sanity check
//
- ChunkVersion shardVersion;
- Status refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &shardVersion);
-
+ ChunkVersion unusedShardVersion;
+ Status refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &unusedShardVersion);
if (!refreshStatus.isOK()) {
std::string errmsg = str::stream()
<< "could not merge chunks, failed to refresh metadata for " << nss.ns()
@@ -118,33 +118,31 @@ Status mergeChunks(OperationContext* opCtx,
return Status(refreshStatus.code(), errmsg);
}
- if (epoch.isSet() && shardVersion.epoch() != epoch) {
- std::string errmsg = stream()
- << "could not merge chunks, collection " << nss.ns() << " has changed"
- << " since merge was sent"
- << "(sent epoch : " << epoch.toString()
- << ", current epoch : " << shardVersion.epoch().toString() << ")";
+ const auto metadata = [&] {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ return CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
+ }();
+
+ if (!metadata) {
+ std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " is not sharded";
warning() << errmsg;
- return Status(ErrorCodes::StaleEpoch, errmsg);
+ return {ErrorCodes::StaleEpoch, errmsg};
}
- ScopedCollectionMetadata metadata;
- {
- AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ const auto shardVersion = metadata->getShardVersion();
- metadata = CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
- if (!metadata) {
- std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " is not sharded";
+ if (epoch.isSet() && shardVersion.epoch() != epoch) {
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns()
+ << " has changed since merge was sent (sent epoch: " << epoch.toString()
+ << ", current epoch: " << shardVersion.epoch() << ")";
- warning() << errmsg;
- return Status(ErrorCodes::IllegalOperation, errmsg);
- }
+ warning() << errmsg;
+ return {ErrorCodes::StaleEpoch, errmsg};
}
- dassert(metadata->getShardVersion().equals(shardVersion));
-
if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) {
std::string errmsg = stream() << "could not merge chunks, the range "
<< redact(ChunkRange(minKey, maxKey).toString())
@@ -156,7 +154,6 @@ Status mergeChunks(OperationContext* opCtx,
return Status(ErrorCodes::IllegalOperation, errmsg);
}
-
//
// Get merged chunk information
//
@@ -277,8 +274,8 @@ Status mergeChunks(OperationContext* opCtx,
// running _configsvrCommitChunkMerge).
//
{
- ChunkVersion shardVersionAfterMerge;
- refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &shardVersionAfterMerge);
+ ChunkVersion unusedShardVersion;
+ refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &unusedShardVersion);
if (!refreshStatus.isOK()) {
std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk ["
@@ -304,7 +301,7 @@ Status mergeChunks(OperationContext* opCtx,
auto writeConcernStatus = std::move(cmdResponseStatus.getValue().writeConcernStatus);
if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) &&
- _checkMetadataForSuccess(opCtx, nss, minKey, maxKey)) {
+ checkMetadataForSuccess(opCtx, nss, minKey, maxKey)) {
LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey)
<< ") has already been committed.";
diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp
index ad26fe22a10..c98a506f05b 100644
--- a/src/mongo/db/s/metadata_manager.cpp
+++ b/src/mongo/db/s/metadata_manager.cpp
@@ -160,8 +160,7 @@ MetadataManager::MetadataManager(ServiceContext* serviceContext,
: _serviceContext(serviceContext),
_nss(std::move(nss)),
_executor(executor),
- _receivingChunks(
- SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) {}
+ _receivingChunks(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<BSONObj>()) {}
MetadataManager::~MetadataManager() {
stdx::lock_guard<stdx::mutex> lg(_managerLock);
@@ -265,7 +264,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
// Should be no more than one.
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end();) {
BSONObj const& min = it->first;
- BSONObj const& max = it->second.getMaxKey();
+ BSONObj const& max = it->second;
if (!remoteMetadata->rangeOverlapsChunk(ChunkRange(min, max))) {
++it;
@@ -308,7 +307,7 @@ void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const {
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) {
BSONArrayBuilder pendingBB(bb.subarrayStart());
pendingBB.append(it->first);
- pendingBB.append(it->second.getMaxKey());
+ pendingBB.append(it->second);
pendingBB.done();
}
}
@@ -321,7 +320,7 @@ void MetadataManager::append(BSONObjBuilder* builder) const {
BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks"));
for (const auto& entry : _receivingChunks) {
BSONObjBuilder obj;
- ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
+ ChunkRange r = ChunkRange(entry.first, entry.second);
r.append(&obj);
pcArr.append(obj.done());
}
@@ -334,7 +333,7 @@ void MetadataManager::append(BSONObjBuilder* builder) const {
BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges"));
for (const auto& entry : _metadata.back()->metadata.getChunks()) {
BSONObjBuilder obj;
- ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
+ ChunkRange r = ChunkRange(entry.first, entry.second);
r.append(&obj);
amrArr.append(obj.done());
}
@@ -360,9 +359,7 @@ void MetadataManager::_pushListToClean(WithLock, std::list<Deletion> ranges) {
}
void MetadataManager::_addToReceiving(WithLock, ChunkRange const& range) {
- _receivingChunks.insert(
- std::make_pair(range.getMin().getOwned(),
- CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED())));
+ _receivingChunks.insert(std::make_pair(range.getMin().getOwned(), range.getMax().getOwned()));
}
auto MetadataManager::beginReceive(ChunkRange const& range) -> CleanupNotification {
diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp
index 7dc36756544..eb0999b6b00 100644
--- a/src/mongo/db/s/metadata_manager_test.cpp
+++ b/src/mongo/db/s/metadata_manager_test.cpp
@@ -304,7 +304,7 @@ TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) {
const auto chunkEntry = _manager->getActiveMetadata(_manager)->getChunks().begin();
ASSERT_BSONOBJ_EQ(BSON("key" << 20), chunkEntry->first);
- ASSERT_BSONOBJ_EQ(BSON("key" << 30), chunkEntry->second.getMaxKey());
+ ASSERT_BSONOBJ_EQ(BSON("key" << 30), chunkEntry->second);
}
// Tests membership functions for _rangesToClean
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index d7d1a351973..1024433b5d6 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -102,9 +102,8 @@ public:
// Refresh our collection manager from the config server, we need a collection manager to
// start registering pending chunks. We force the remote refresh here to make the behavior
// consistent and predictable, generally we'd refresh anyway, and to be paranoid.
- ChunkVersion currentVersion;
-
- Status status = shardingState->refreshMetadataNow(opCtx, nss, &currentVersion);
+ ChunkVersion shardVersion;
+ Status status = shardingState->refreshMetadataNow(opCtx, nss, &shardVersion);
if (!status.isOK()) {
errmsg = str::stream() << "cannot start receiving chunk "
<< redact(chunkRange.toString()) << causedBy(redact(status));
@@ -147,7 +146,7 @@ public:
chunkRange.getMin(),
chunkRange.getMax(),
shardKeyPattern,
- currentVersion.epoch(),
+ shardVersion.epoch(),
writeConcern));
result.appendBool("started", true);
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index b26dfb91b2e..9d9d424bf24 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -36,8 +36,6 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/s/collection_metadata.h"
-#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/shard_metadata_util.h"
@@ -126,8 +124,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
HostAndPort recipientHost)
: _args(std::move(request)),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)),
- _startTime() {
+ _recipientHost(std::move(recipientHost)) {
invariant(!opCtx->lockState()->isLocked());
// Disallow moving a chunk to ourselves
@@ -226,10 +223,10 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) {
<< "to"
<< _args.getToShardId()),
ShardingCatalogClient::kMajorityWriteConcern)
- .transitional_ignore();
+ .ignore();
_cloneDriver = stdx::make_unique<MigrationChunkClonerSourceLegacy>(
- _args, _collectionMetadata->getKeyPattern(), _donorConnStr, _recipientHost);
+ _args, _keyPattern, _donorConnStr, _recipientHost);
{
// Register for notifications from the replication subsystem
@@ -271,38 +268,13 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
invariant(_state == kCloneCaughtUp);
auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
- const ShardId& recipientId = _args.getToShardId();
- if (!_collectionMetadata->getChunkManager()->getVersion(recipientId).isSet() &&
- (serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo36)) {
- // The recipient didn't have any chunks of this collection. Write the no-op message so that
- // change stream will notice that and close cursor to notify mongos to target to the new
- // shard.
- std::stringstream ss;
- // The message for debugging.
- ss << "Migrating chunk from shard " << _args.getFromShardId() << " to shard "
- << _args.getToShardId() << " with no chunks for this collection";
- // The message expected by change streams.
- auto message = BSON("type"
- << "migrateChunkToNewShard"
- << "from"
- << _args.getFromShardId()
- << "to"
- << _args.getToShardId());
- AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
- writeConflictRetry(
- opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
- WriteUnitOfWork uow(opCtx);
- opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
- opCtx, getNss(), _collectionUuid, BSON("msg" << ss.str()), message);
- uow.commit();
- });
- }
+ _notifyChangeStreamsOnRecipientFirstChunk(opCtx);
// Mark the shard as running critical operation, which requires recovery on crash.
//
- // Note: the 'migrateChunkToNewShard' oplog message written above depends on this
- // majority write to carry its local write to majority committed.
+ // NOTE: The 'migrateChunkToNewShard' oplog message written by the above call to
+ // '_notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its local
+ // write to majority committed.
Status status = ShardingStateRecovery::startMetadataOp(opCtx);
if (!status.isOK()) {
return status;
@@ -547,7 +519,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
<< "to"
<< _args.getToShardId()),
ShardingCatalogClient::kMajorityWriteConcern)
- .transitional_ignore();
+ .ignore();
// Wait for the metadata update to be persisted before attempting to delete orphaned documents
// so that metadata changes propagate to secondaries first
@@ -605,11 +577,45 @@ void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) {
<< "to"
<< _args.getToShardId()),
ShardingCatalogClient::kMajorityWriteConcern)
- .transitional_ignore();
+ .ignore();
_cleanup(opCtx);
}
+void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx) {
+ // Change streams are only supported in 3.6 and above
+ if (serverGlobalParams.featureCompatibility.getVersion() !=
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo36)
+ return;
+
+ // If this is not the first donation, there is nothing to be done
+ if (_collectionMetadata->getChunkManager()->getVersion(_args.getToShardId()).isSet())
+ return;
+
+ const std::string dbgMessage = str::stream()
+ << "Migrating chunk from shard " << _args.getFromShardId() << " to shard "
+ << _args.getToShardId() << " with no chunks for this collection";
+
+ // The message expected by change streams
+ const auto o2Message = BSON("type"
+ << "migrateChunkToNewShard"
+ << "from"
+ << _args.getFromShardId()
+ << "to"
+ << _args.getToShardId());
+
+ auto const serviceContext = opCtx->getClient()->getServiceContext();
+
+ AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
+ writeConflictRetry(
+ opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ WriteUnitOfWork uow(opCtx);
+ serviceContext->getOpObserver()->onInternalOpMessage(
+ opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message);
+ uow.commit();
+ });
+}
+
void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
invariant(_state != kDone);
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 0f90ac96f2b..255aa984024 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -28,12 +28,9 @@
#pragma once
-#include <string>
-
#include "mongo/base/disallow_copying.h"
-#include "mongo/db/s/metadata_manager.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/s/move_chunk_request.h"
-#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/timer.h"
@@ -157,13 +154,6 @@ public:
void cleanupOnError(OperationContext* opCtx);
/**
- * Returns the key pattern object for the stored committed metadata.
- */
- BSONObj getKeyPattern() const {
- return _keyPattern;
- }
-
- /**
* Returns the cloner which is being used for this migration. This value is available only if
* the migration source manager is currently in the clone phase (i.e. the previous call to
* startClone has succeeded).
@@ -197,6 +187,13 @@ private:
enum State { kCreated, kCloning, kCloneCaughtUp, kCriticalSection, kCloneCompleted, kDone };
/**
+ * If this donation moves the first chunk to the recipient (i.e., the recipient didn't have any
+ * chunks), this function writes a no-op message to the oplog, so that change stream will notice
+ * that and close the cursor in order to notify mongos to target the new shard as well.
+ */
+ void _notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx);
+
+ /**
* Called when any of the states fails. May only be called once and will put the migration
* manager into the kDone state.
*/
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 2f46a11552e..608a4bac233 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -243,26 +243,23 @@ Status ShardingState::onStaleShardVersion(OperationContext* opCtx,
auto& oss = OperationShardingState::get(opCtx);
oss.waitForMigrationCriticalSectionSignal(opCtx);
- ChunkVersion collectionShardVersion;
-
- // Fast path - check if the requested version is at a higher version than the current metadata
- // version or a different epoch before verifying against config server.
- ScopedCollectionMetadata currentMetadata;
-
- {
+ const auto collectionShardVersion = [&] {
+ // Fast path - check if the requested version is at a higher version than the current
+ // metadata version or a different epoch before verifying against config server
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
-
- currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata();
+ const auto currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata();
if (currentMetadata) {
- collectionShardVersion = currentMetadata->getShardVersion();
+ return currentMetadata->getShardVersion();
}
- if (collectionShardVersion.epoch() == expectedVersion.epoch() &&
- collectionShardVersion >= expectedVersion) {
- // Don't need to remotely reload if we're in the same epoch and the requested version is
- // smaller than the one we know about. This means that the remote side is behind.
- return Status::OK();
- }
+ return ChunkVersion::UNSHARDED();
+ }();
+
+ if (collectionShardVersion.epoch() == expectedVersion.epoch() &&
+ collectionShardVersion >= expectedVersion) {
+ // Don't need to remotely reload if we're in the same epoch and the requested version is
+ // smaller than the one we know about. This means that the remote side is behind.
+ return Status::OK();
}
try {
diff --git a/src/mongo/db/s/split_chunk.cpp b/src/mongo/db/s/split_chunk.cpp
index 3dea2009f94..77120fb2674 100644
--- a/src/mongo/db/s/split_chunk.cpp
+++ b/src/mongo/db/s/split_chunk.cpp
@@ -95,13 +95,14 @@ bool checkMetadataForSuccessfulSplitChunk(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkRange& chunkRange,
const std::vector<BSONObj>& splitKeys) {
- ScopedCollectionMetadata metadataAfterSplit;
- {
+ const auto metadataAfterSplit = [&] {
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ return CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
+ }();
- // Get collection metadata
- metadataAfterSplit = CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
- }
+ uassert(ErrorCodes::StaleConfig,
+ str::stream() << "Collection " << nss.ns() << " became unsharded",
+ metadataAfterSplit);
auto newChunkBounds(splitKeys);
auto startKey = chunkRange.getMin();