summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/collection_sharding_state.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/collection_sharding_state.cpp')
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp284
1 files changed, 0 insertions, 284 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 3c8d4d6e8d6..d19955f6a3c 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -37,29 +37,16 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/s/type_shard_identity.h"
-#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/s/balancer_configuration.h"
-#include "mongo/s/catalog/type_config_version.h"
-#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/catalog/type_shard_collection.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/catalog_cache_loader.h"
-#include "mongo/s/cluster_identity_loader.h"
-#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
@@ -169,64 +156,6 @@ private:
const auto getCollectionShardingStateMap =
ServiceContext::declareDecoration<CollectionShardingStateMap>();
-/**
- * Used to perform shard identity initialization once it is certain that the document is committed.
- */
-class ShardIdentityLogOpHandler final : public RecoveryUnit::Change {
-public:
- ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity)
- : _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {}
-
- void commit() override {
- fassertNoTrace(
- 40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity));
- }
-
- void rollback() override {}
-
-private:
- OperationContext* _opCtx;
- const ShardIdentityType _shardIdentity;
-};
-
-/**
- * Used to notify the catalog cache loader of a new collection version and invalidate the in-memory
- * routing table cache once the oplog updates are committed and become visible.
- */
-class CollectionVersionLogOpHandler final : public RecoveryUnit::Change {
-public:
- CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss)
- : _opCtx(opCtx), _nss(nss) {}
-
- void commit() override {
- invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
-
- CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss);
-
- // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the X
- // lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until
- // SERVER-31595 removes the X lock requirement.
- CollectionShardingState::get(_opCtx, _nss)->markNotShardedAtStepdown();
- }
-
- void rollback() override {}
-
-private:
- OperationContext* _opCtx;
- const NamespaceString _nss;
-};
-
-/**
- * Caller must hold the global lock in some mode other than MODE_NONE.
- */
-bool isStandaloneOrPrimary(OperationContext* opCtx) {
- dassert(opCtx->lockState()->isLocked());
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
- return !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
- repl::MemberState::RS_PRIMARY);
-}
-
} // namespace
CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss)
@@ -394,24 +323,6 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx,
const repl::OpTime& opTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- if (_nss == NamespaceString::kServerConfigurationNamespace) {
- if (auto idElem = insertedDoc["_id"]) {
- if (idElem.str() == ShardIdentityType::IdName) {
- auto shardIdentityDoc =
- uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc));
- uassertStatusOK(shardIdentityDoc.validate());
- opCtx->recoveryUnit()->registerChange(
- new ShardIdentityLogOpHandler(opCtx, std::move(shardIdentityDoc)));
- }
- }
- }
-
- if (ShardingState::get(opCtx)->enabled()) {
- _incrementChunkOnInsertOrUpdate(opCtx, insertedDoc, insertedDoc.objsize());
- }
- }
-
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
@@ -420,23 +331,11 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx,
}
void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
- const BSONObj& query,
- const BSONObj& update,
const BSONObj& updatedDoc,
const repl::OpTime& opTime,
const repl::OpTime& prePostImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- if (_nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) {
- _onConfigCollectionsUpdateOp(opCtx, query, update, updatedDoc);
- }
-
- if (ShardingState::get(opCtx)->enabled()) {
- _incrementChunkOnInsertOrUpdate(opCtx, updatedDoc, update.objsize());
- }
- }
-
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
@@ -455,41 +354,6 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
const repl::OpTime& preImageOpTime) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- if (_nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) {
- _onConfigDeleteInvalidateCachedMetadataAndNotify(opCtx, deleteState.documentKey);
- }
-
- if (_nss == NamespaceString::kServerConfigurationNamespace) {
- if (auto idElem = deleteState.documentKey["_id"]) {
- auto idStr = idElem.str();
- if (idStr == ShardIdentityType::IdName) {
- if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) {
- uasserted(40070,
- "cannot delete shardIdentity document while in --shardsvr mode");
- } else {
- warning() << "Shard identity document rolled back. Will shut down after "
- "finishing rollback.";
- ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened();
- }
- }
- }
- }
- }
-
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- if (_nss == VersionType::ConfigNS) {
- if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) {
- uasserted(40302, "cannot delete config.version document while in --configsvr mode");
- } else {
- // Throw out any cached information related to the cluster ID.
- ShardingCatalogManager::get(opCtx)
- ->discardCachedConfigDatabaseInitializationState();
- ClusterIdentityLoader::get(opCtx)->discardCachedClusterId();
- }
- }
- }
-
checkShardVersionOrThrow(opCtx);
if (_sourceMgr && deleteState.isMigrating) {
@@ -497,100 +361,6 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
}
}
-void CollectionShardingState::onDropCollection(OperationContext* opCtx,
- const NamespaceString& collectionName) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
-
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
- _nss == NamespaceString::kServerConfigurationNamespace) {
- // Dropping system collections is not allowed for end users.
- invariant(!opCtx->writesAreReplicated());
- invariant(repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback());
-
- // Can't confirm whether there was a ShardIdentity document or not yet, so assume there was
- // one and shut down the process to clear the in-memory sharding state.
- warning() << "admin.system.version collection rolled back. Will shut down after "
- "finishing rollback";
- ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened();
- }
-
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- if (_nss == VersionType::ConfigNS) {
- if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) {
- uasserted(40303, "cannot drop config.version document while in --configsvr mode");
- } else {
- // Throw out any cached information related to the cluster ID.
- ShardingCatalogManager::get(opCtx)
- ->discardCachedConfigDatabaseInitializationState();
- ClusterIdentityLoader::get(opCtx)->discardCachedClusterId();
- }
- }
- }
-}
-
-void CollectionShardingState::_onConfigCollectionsUpdateOp(OperationContext* opCtx,
- const BSONObj& query,
- const BSONObj& update,
- const BSONObj& updatedDoc) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
- invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
-
- // Notification of routing table changes are only needed on secondaries.
- if (isStandaloneOrPrimary(opCtx)) {
- return;
- }
-
- // Extract which user collection was updated.
- std::string updatedCollection;
- fassert(40477,
- bsonExtractStringField(query, ShardCollectionType::ns.name(), &updatedCollection));
-
- // Parse the '$set' update.
- BSONElement setElement;
- Status setStatus = bsonExtractTypedField(update, StringData("$set"), Object, &setElement);
- if (setStatus.isOK()) {
- BSONObj setField = setElement.Obj();
- const NamespaceString updatedNss(updatedCollection);
-
- // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit().
- AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX);
-
- if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) {
- opCtx->recoveryUnit()->registerChange(
- new CollectionVersionLogOpHandler(opCtx, updatedNss));
- }
-
- if (setField.hasField(ShardCollectionType::enterCriticalSectionCounter.name())) {
- // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the
- // X lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until
- // SERVER-31595 removes the X lock requirement.
- CollectionShardingState::get(opCtx, updatedNss)->markNotShardedAtStepdown();
- }
- }
-}
-
-void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify(
- OperationContext* opCtx, const BSONObj& query) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
- invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
-
- // Notification of routing table changes are only needed on secondaries.
- if (isStandaloneOrPrimary(opCtx)) {
- return;
- }
-
- // Extract which collection entry is being deleted from the _id field.
- std::string deletedCollection;
- fassert(40479,
- bsonExtractStringField(query, ShardCollectionType::ns.name(), &deletedCollection));
- const NamespaceString deletedNss(deletedCollection);
-
- // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit().
- AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX);
-
- opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(opCtx, deletedNss));
-}
-
bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
std::string* errmsg,
ChunkVersion* expectedShardVersion,
@@ -681,58 +451,4 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-uint64_t CollectionShardingState::_incrementChunkOnInsertOrUpdate(OperationContext* opCtx,
- const BSONObj& document,
- long dataWritten) {
-
- // Here, get the collection metadata and check if it exists. If it doesn't exist, then the
- // collection is not sharded, and we can simply return -1.
- ScopedCollectionMetadata metadata = getMetadata();
- if (!metadata) {
- return -1;
- }
-
- std::shared_ptr<ChunkManager> cm = metadata->getChunkManager();
- const ShardKeyPattern& shardKeyPattern = cm->getShardKeyPattern();
-
- // Each inserted/updated document should contain the shard key. The only instance in which a
- // document could not contain a shard key is if the insert/update is performed through mongod
- // explicitly, as opposed to first routed through mongos.
- BSONObj shardKey = shardKeyPattern.extractShardKeyFromDoc(document);
- if (shardKey.woCompare(BSONObj()) == 0) {
- warning() << "inserting document " << document.toString() << " without shard key pattern "
- << shardKeyPattern << " into a sharded collection";
- return -1;
- }
-
- // Use the shard key to locate the chunk into which the document was updated, and increment the
- // number of bytes tracked for the chunk. Note that we can assume the simple collation, because
- // shard keys do not support non-simple collations.
- auto chunk = cm->findIntersectingChunkWithSimpleCollation(shardKey);
- chunk->addBytesWritten(dataWritten);
-
- // If the chunk becomes too large, then we call the ChunkSplitter to schedule a split. Then, we
- // reset the tracking for that chunk to 0.
- if (_shouldSplitChunk(opCtx, shardKeyPattern, *chunk)) {
- // TODO: call ChunkSplitter here
- chunk->clearBytesWritten();
- }
-
- return chunk->getBytesWritten();
-}
-
-bool CollectionShardingState::_shouldSplitChunk(OperationContext* opCtx,
- const ShardKeyPattern& shardKeyPattern,
- const Chunk& chunk) {
-
- const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
- invariant(balancerConfig);
-
- const KeyPattern keyPattern = shardKeyPattern.getKeyPattern();
- const bool minIsInf = (0 == keyPattern.globalMin().woCompare(chunk.getMin()));
- const bool maxIsInf = (0 == keyPattern.globalMax().woCompare(chunk.getMax()));
-
- return chunk.shouldSplit(balancerConfig->getMaxChunkSizeBytes(), minIsInf, maxIsInf);
-}
-
} // namespace mongo