diff options
Diffstat (limited to 'src/mongo/db/s/collection_sharding_state.cpp')
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 284 |
1 files changed, 0 insertions, 284 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 3c8d4d6e8d6..d19955f6a3c 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -37,29 +37,16 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/shard_identity_rollback_notifier.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/type_shard_identity.h" -#include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/type_config_version.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/catalog/type_shard_collection.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/catalog_cache_loader.h" -#include "mongo/s/cluster_identity_loader.h" -#include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -169,64 +156,6 @@ private: const auto getCollectionShardingStateMap = ServiceContext::declareDecoration<CollectionShardingStateMap>(); -/** - * Used to perform shard identity initialization once it is certain that the document is committed. - */ -class ShardIdentityLogOpHandler final : public RecoveryUnit::Change { -public: - ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity) - : _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {} - - void commit() override { - fassertNoTrace( - 40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity)); - } - - void rollback() override {} - -private: - OperationContext* _opCtx; - const ShardIdentityType _shardIdentity; -}; - -/** - * Used to notify the catalog cache loader of a new collection version and invalidate the in-memory - * routing table cache once the oplog updates are committed and become visible. - */ -class CollectionVersionLogOpHandler final : public RecoveryUnit::Change { -public: - CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss) - : _opCtx(opCtx), _nss(nss) {} - - void commit() override { - invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - - CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss); - - // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the X - // lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until - // SERVER-31595 removes the X lock requirement. - CollectionShardingState::get(_opCtx, _nss)->markNotShardedAtStepdown(); - } - - void rollback() override {} - -private: - OperationContext* _opCtx; - const NamespaceString _nss; -}; - -/** - * Caller must hold the global lock in some mode other than MODE_NONE. - */ -bool isStandaloneOrPrimary(OperationContext* opCtx) { - dassert(opCtx->lockState()->isLocked()); - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - return !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == - repl::MemberState::RS_PRIMARY); -} - } // namespace CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) @@ -394,24 +323,6 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, const repl::OpTime& opTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (_nss == NamespaceString::kServerConfigurationNamespace) { - if (auto idElem = insertedDoc["_id"]) { - if (idElem.str() == ShardIdentityType::IdName) { - auto shardIdentityDoc = - uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc)); - uassertStatusOK(shardIdentityDoc.validate()); - opCtx->recoveryUnit()->registerChange( - new ShardIdentityLogOpHandler(opCtx, std::move(shardIdentityDoc))); - } - } - } - - if (ShardingState::get(opCtx)->enabled()) { - _incrementChunkOnInsertOrUpdate(opCtx, insertedDoc, insertedDoc.objsize()); - } - } - checkShardVersionOrThrow(opCtx); if (_sourceMgr) { @@ -420,23 +331,11 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, } void CollectionShardingState::onUpdateOp(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, const BSONObj& updatedDoc, const repl::OpTime& opTime, const repl::OpTime& prePostImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (_nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) { - _onConfigCollectionsUpdateOp(opCtx, query, update, updatedDoc); - } - - if (ShardingState::get(opCtx)->enabled()) { - _incrementChunkOnInsertOrUpdate(opCtx, updatedDoc, update.objsize()); - } - } - checkShardVersionOrThrow(opCtx); if (_sourceMgr) { @@ -455,41 +354,6 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const repl::OpTime& preImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (_nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) { - _onConfigDeleteInvalidateCachedMetadataAndNotify(opCtx, deleteState.documentKey); - } - - if (_nss == NamespaceString::kServerConfigurationNamespace) { - if (auto idElem = deleteState.documentKey["_id"]) { - auto idStr = idElem.str(); - if (idStr == ShardIdentityType::IdName) { - if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { - uasserted(40070, - "cannot delete shardIdentity document while in --shardsvr mode"); - } else { - warning() << "Shard identity document rolled back. Will shut down after " - "finishing rollback."; - ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); - } - } - } - } - } - - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (_nss == VersionType::ConfigNS) { - if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { - uasserted(40302, "cannot delete config.version document while in --configsvr mode"); - } else { - // Throw out any cached information related to the cluster ID. - ShardingCatalogManager::get(opCtx) - ->discardCachedConfigDatabaseInitializationState(); - ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); - } - } - } - checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { @@ -497,100 +361,6 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, } } -void CollectionShardingState::onDropCollection(OperationContext* opCtx, - const NamespaceString& collectionName) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && - _nss == NamespaceString::kServerConfigurationNamespace) { - // Dropping system collections is not allowed for end users. - invariant(!opCtx->writesAreReplicated()); - invariant(repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()); - - // Can't confirm whether there was a ShardIdentity document or not yet, so assume there was - // one and shut down the process to clear the in-memory sharding state. - warning() << "admin.system.version collection rolled back. Will shut down after " - "finishing rollback"; - ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); - } - - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (_nss == VersionType::ConfigNS) { - if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { - uasserted(40303, "cannot drop config.version document while in --configsvr mode"); - } else { - // Throw out any cached information related to the cluster ID. - ShardingCatalogManager::get(opCtx) - ->discardCachedConfigDatabaseInitializationState(); - ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); - } - } - } -} - -void CollectionShardingState::_onConfigCollectionsUpdateOp(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, - const BSONObj& updatedDoc) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - - // Notification of routing table changes are only needed on secondaries. - if (isStandaloneOrPrimary(opCtx)) { - return; - } - - // Extract which user collection was updated. - std::string updatedCollection; - fassert(40477, - bsonExtractStringField(query, ShardCollectionType::ns.name(), &updatedCollection)); - - // Parse the '$set' update. - BSONElement setElement; - Status setStatus = bsonExtractTypedField(update, StringData("$set"), Object, &setElement); - if (setStatus.isOK()) { - BSONObj setField = setElement.Obj(); - const NamespaceString updatedNss(updatedCollection); - - // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). - AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX); - - if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { - opCtx->recoveryUnit()->registerChange( - new CollectionVersionLogOpHandler(opCtx, updatedNss)); - } - - if (setField.hasField(ShardCollectionType::enterCriticalSectionCounter.name())) { - // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the - // X lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until - // SERVER-31595 removes the X lock requirement. - CollectionShardingState::get(opCtx, updatedNss)->markNotShardedAtStepdown(); - } - } -} - -void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify( - OperationContext* opCtx, const BSONObj& query) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - - // Notification of routing table changes are only needed on secondaries. - if (isStandaloneOrPrimary(opCtx)) { - return; - } - - // Extract which collection entry is being deleted from the _id field. - std::string deletedCollection; - fassert(40479, - bsonExtractStringField(query, ShardCollectionType::ns.name(), &deletedCollection)); - const NamespaceString deletedNss(deletedCollection); - - // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). - AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX); - - opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(opCtx, deletedNss)); -} - bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, std::string* errmsg, ChunkVersion* expectedShardVersion, @@ -681,58 +451,4 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, MONGO_UNREACHABLE; } -uint64_t CollectionShardingState::_incrementChunkOnInsertOrUpdate(OperationContext* opCtx, - const BSONObj& document, - long dataWritten) { - - // Here, get the collection metadata and check if it exists. If it doesn't exist, then the - // collection is not sharded, and we can simply return -1. - ScopedCollectionMetadata metadata = getMetadata(); - if (!metadata) { - return -1; - } - - std::shared_ptr<ChunkManager> cm = metadata->getChunkManager(); - const ShardKeyPattern& shardKeyPattern = cm->getShardKeyPattern(); - - // Each inserted/updated document should contain the shard key. The only instance in which a - // document could not contain a shard key is if the insert/update is performed through mongod - // explicitly, as opposed to first routed through mongos. - BSONObj shardKey = shardKeyPattern.extractShardKeyFromDoc(document); - if (shardKey.woCompare(BSONObj()) == 0) { - warning() << "inserting document " << document.toString() << " without shard key pattern " - << shardKeyPattern << " into a sharded collection"; - return -1; - } - - // Use the shard key to locate the chunk into which the document was updated, and increment the - // number of bytes tracked for the chunk. Note that we can assume the simple collation, because - // shard keys do not support non-simple collations. - auto chunk = cm->findIntersectingChunkWithSimpleCollation(shardKey); - chunk->addBytesWritten(dataWritten); - - // If the chunk becomes too large, then we call the ChunkSplitter to schedule a split. Then, we - // reset the tracking for that chunk to 0. - if (_shouldSplitChunk(opCtx, shardKeyPattern, *chunk)) { - // TODO: call ChunkSplitter here - chunk->clearBytesWritten(); - } - - return chunk->getBytesWritten(); -} - -bool CollectionShardingState::_shouldSplitChunk(OperationContext* opCtx, - const ShardKeyPattern& shardKeyPattern, - const Chunk& chunk) { - - const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - invariant(balancerConfig); - - const KeyPattern keyPattern = shardKeyPattern.getKeyPattern(); - const bool minIsInf = (0 == keyPattern.globalMin().woCompare(chunk.getMin())); - const bool maxIsInf = (0 == keyPattern.globalMax().woCompare(chunk.getMax())); - - return chunk.shouldSplit(balancerConfig->getMaxChunkSizeBytes(), minIsInf, maxIsInf); -} - } // namespace mongo |