summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/create_collection_coordinator.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2022-04-07 07:11:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-07 08:03:37 +0000
commitc257704b483fb3938465f5eb4f2a01e1d1e7c119 (patch)
tree75bd1d24a5a819d2d53b8c372d547e9200c01484 /src/mongo/db/s/create_collection_coordinator.cpp
parent487babfa93b6f6f2fb500ecb5411045be89e881c (diff)
downloadmongo-c257704b483fb3938465f5eb4f2a01e1d1e7c119.tar.gz
SERVER-64822 Clear filtering metadata if createCollection commit fails
Diffstat (limited to 'src/mongo/db/s/create_collection_coordinator.cpp')
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp134
1 files changed, 57 insertions, 77 deletions
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index b190f458c47..d3ec9d03230 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -315,52 +315,27 @@ void insertChunks(OperationContext* opCtx,
}
}
-void updateCatalogEntry(OperationContext* opCtx,
- const NamespaceString& nss,
- CollectionType& coll,
- const OperationSessionInfo& osi) {
+void insertCollectionEntry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ CollectionType& coll,
+ const OperationSessionInfo& osi) {
const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- BatchedCommandRequest updateRequest([&]() {
- write_ops::UpdateCommandRequest updateOp(CollectionType::ConfigNS);
- updateOp.setUpdates({[&] {
- write_ops::UpdateOpEntry entry;
- entry.setQ(BSON(CollectionType::kNssFieldName << nss.ns()));
- entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(coll.toBSON()));
- entry.setUpsert(true);
- entry.setMulti(false);
- return entry;
- }()});
- return updateOp;
- }());
-
- updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
- const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON());
+ BatchedCommandRequest insertRequest(
+ write_ops::InsertCommandRequest(CollectionType::ConfigNS, {coll.toBSON()}));
+ insertRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
- try {
- BatchedCommandResponse batchResponse;
- const auto response =
- configShard->runCommand(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- CollectionType::ConfigNS.db().toString(),
- cmdObj,
- Shard::kDefaultConfigCommandTimeout,
- Shard::RetryPolicy::kIdempotent);
-
- const auto writeStatus =
- Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse);
-
- uassertStatusOK(batchResponse.toStatus());
- uassertStatusOK(writeStatus);
- } catch (const DBException&) {
- // If an error happens when contacting the config server, we don't know if the update
- // succeeded or not, which might cause the local shard version to differ from the config
- // server, so we clear the metadata to allow another operation to refresh it.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
- CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx);
- throw;
- }
+ const BSONObj cmdObj = insertRequest.toBSON().addFields(osi.toBSON());
+
+ BatchedCommandResponse unusedResponse;
+ uassertStatusOK(Shard::CommandResponse::processBatchWriteResponse(
+ configShard->runCommand(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ CollectionType::ConfigNS.db().toString(),
+ cmdObj,
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent),
+ &unusedResponse));
}
void broadcastDropCollection(OperationContext* opCtx,
@@ -379,7 +354,6 @@ void broadcastDropCollection(OperationContext* opCtx,
opCtx, nss, participants, executor, osi);
}
-
/**
* This function writes a no-op oplog entry on shardCollection event.
*/
@@ -387,8 +361,6 @@ void _writeOplogMessage(OperationContext* opCtx,
const NamespaceString& nss,
const UUID& uuid,
const BSONObj cmd) {
-
-
BSONObjBuilder cmdBuilder;
cmdBuilder.append("shardCollection", nss.ns());
cmdBuilder.appendElements(cmd);
@@ -417,6 +389,7 @@ void _writeOplogMessage(OperationContext* opCtx,
wunit.commit();
});
}
+
} // namespace
CreateCollectionCoordinator::CreateCollectionCoordinator(ShardingDDLCoordinatorService* service,
@@ -507,16 +480,19 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
_shardKeyPattern->getKeyPattern().toBSON(),
getCollation(opCtx, nss(), _doc.getCollation()).second,
_doc.getUnique().value_or(false))) {
- _result = createCollectionResponseOpt;
_checkCollectionUUIDMismatch(opCtx);
- // The collection was already created and commited but there was a
- // stepdown after the commit.
+
+ // The critical section can still be held here if the node committed the
+ // sharding of the collection but then it stepped down before it managed to
+ // delete the coordinator document
RecoverableCriticalSectionService::get(opCtx)
->releaseRecoverableCriticalSection(
opCtx,
nss(),
_getCriticalSectionReason(),
ShardingCatalogClient::kMajorityWriteConcern);
+
+ _result = createCollectionResponseOpt;
return;
}
@@ -561,10 +537,10 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
if (_splitPolicy->isOptimized()) {
_createChunks(opCtx);
- // Block reads/writes from here on if we need to create
- // the collection on other shards, this way we prevent
- // reads/writes that should be redirected to another
- // shard.
+
+ // Block reads/writes from here on if we need to create the collection on other
+ // shards, this way we prevent reads/writes that should be redirected to another
+ // shard
RecoverableCriticalSectionService::get(opCtx)
->promoteRecoverableCriticalSectionToBlockAlsoReads(
opCtx,
@@ -597,11 +573,8 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
// collections.
if (!_splitPolicy->isOptimized()) {
_createChunks(opCtx);
-
_commit(opCtx);
}
-
- _finalize(opCtx);
}))
.then([this] {
auto opCtxHolder = cc().makeOperationContext();
@@ -824,9 +797,8 @@ void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) {
opCtx, *_shardKeyPattern, {*_collectionUUID, ShardingState::get(opCtx)->shardId()});
// There must be at least one chunk.
- invariant(!_initialChunks.chunks.empty());
-
- _numChunks = _initialChunks.chunks.size();
+ invariant(_initialChunks);
+ invariant(!_initialChunks->chunks.empty());
}
void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(
@@ -843,7 +815,7 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(
NamespaceStringOrUUID nssOrUUID{nss().db().toString(), *_collectionUUID};
auto [collOptions, indexes, idIndex] = getCollectionOptionsAndIndexes(opCtx, nssOrUUID);
- for (const auto& chunk : _initialChunks.chunks) {
+ for (const auto& chunk : _initialChunks->chunks) {
const auto& chunkShardId = chunk.getShard();
if (chunkShardId == dbPrimaryShardId ||
initializedShards.find(chunkShardId) != initializedShards.end()) {
@@ -897,11 +869,11 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
// Upsert Chunks.
_doc = _updateSession(opCtx, _doc);
- insertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc));
+ insertChunks(opCtx, _initialChunks->chunks, getCurrentSession(_doc));
CollectionType coll(nss(),
- _initialChunks.collVersion().epoch(),
- _initialChunks.collVersion().getTimestamp(),
+ _initialChunks->collVersion().epoch(),
+ _initialChunks->collVersion().getTimestamp(),
Date_t::now(),
*_collectionUUID,
_shardKeyPattern->getKeyPattern());
@@ -921,21 +893,28 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
}
_doc = _updateSession(opCtx, _doc);
- updateCatalogEntry(opCtx, nss(), coll, getCurrentSession(_doc));
- _writeOplogMessage(opCtx, nss(), *_collectionUUID, _doc.getCreateCollectionRequest().toBSON());
-}
+ try {
+ insertCollectionEntry(opCtx, nss(), coll, getCurrentSession(_doc));
-void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) {
- LOGV2_DEBUG(5277907, 2, "Create collection _finalize", "namespace"_attr = nss());
+ _writeOplogMessage(
+ opCtx, nss(), *_collectionUUID, _doc.getCreateCollectionRequest().toBSON());
+
+ LOGV2_DEBUG(5277907, 2, "Collection successfully committed", "namespace"_attr = nss());
- try {
forceShardFilteringMetadataRefresh(opCtx, nss());
- } catch (const DBException&) {
+ } catch (const DBException& ex) {
+ LOGV2(5277908,
+ "Failed to obtain collection's shard version, so it will be recovered",
+ "namespace"_attr = nss(),
+ "error"_attr = redact(ex));
+
// If the refresh fails, then set the shard version to UNKNOWN and let a future operation to
// refresh the metadata.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
AutoGetCollection autoColl(opCtx, nss(), MODE_IX);
CollectionShardingRuntime::get(opCtx, nss())->clearFilteringMetadata(opCtx);
+
+ throw;
}
// Best effort refresh to warm up cache of all involved shards so we can have a cluster ready to
@@ -944,8 +923,9 @@ void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) {
auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId();
std::set<ShardId> shardsRefreshed;
- for (const auto& chunk : _initialChunks.chunks) {
+ for (const auto& chunk : _initialChunks->chunks) {
const auto& chunkShardId = chunk.getShard();
+
if (chunkShardId == dbPrimaryShardId ||
shardsRefreshed.find(chunkShardId) != shardsRefreshed.end()) {
continue;
@@ -963,11 +943,10 @@ void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) {
LOGV2(5277901,
"Created initial chunk(s)",
"namespace"_attr = nss(),
- "numInitialChunks"_attr = _initialChunks.chunks.size(),
- "initialCollectionVersion"_attr = _initialChunks.collVersion());
+ "numInitialChunks"_attr = _initialChunks->chunks.size(),
+ "initialCollectionVersion"_attr = _initialChunks->collVersion());
- auto result = CreateCollectionResponse(
- _initialChunks.chunks[_initialChunks.chunks.size() - 1].getVersion());
+ auto result = CreateCollectionResponse(_initialChunks->chunks.back().getVersion());
result.setCollectionUUID(_collectionUUID);
_result = std::move(result);
@@ -993,8 +972,9 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt
collectionDetail.append("version", _result->getCollectionVersion().toString());
if (_collectionEmpty)
collectionDetail.append("empty", *_collectionEmpty);
- if (_numChunks)
- collectionDetail.appendNumber("numChunks", static_cast<long long>(*_numChunks));
+ if (_initialChunks)
+ collectionDetail.appendNumber("numChunks",
+ static_cast<long long>(_initialChunks->chunks.size()));
ShardingLogging::get(opCtx)->logChange(
opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj());
}