summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp6
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.h7
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp6
-rw-r--r--src/mongo/db/s/collection_sharding_state.h6
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h11
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp121
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h52
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp8
-rw-r--r--src/mongo/db/s/migration_source_manager.h5
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp4
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp40
11 files changed, 165 insertions, 101 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index 05fbb152475..684ae740a00 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -86,6 +86,12 @@ CollectionShardingRuntime* CollectionShardingRuntime::get(OperationContext* opCt
return checked_cast<CollectionShardingRuntime*>(css);
}
+CollectionShardingRuntime* CollectionShardingRuntime::get_UNSAFE(ServiceContext* svcCtx,
+ const NamespaceString& nss) {
+ auto* const css = CollectionShardingState::get_UNSAFE(svcCtx, nss);
+ return checked_cast<CollectionShardingRuntime*>(css);
+}
+
void CollectionShardingRuntime::setFilteringMetadata(OperationContext* opCtx,
CollectionMetadata newMetadata) {
invariant(!newMetadata.isSharded() || !isNamespaceAlwaysUnsharded(_nss),
diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h
index d2aefc10ff4..42d91a18913 100644
--- a/src/mongo/db/s/collection_sharding_runtime.h
+++ b/src/mongo/db/s/collection_sharding_runtime.h
@@ -65,6 +65,13 @@ public:
static CollectionShardingRuntime* get(OperationContext* opCtx, const NamespaceString& nss);
/**
+ * It is the caller's responsibility to ensure that the collection locks for this namespace are
+ * held when this is called. The returned pointer should never be stored.
+ */
+ static CollectionShardingRuntime* get_UNSAFE(ServiceContext* svcCtx,
+ const NamespaceString& nss);
+
+ /**
* Updates the collection's filtering metadata based on changes received from the config server
* and also resolves the pending receives map in case some of these pending receives have
* committed on the config server or have been abandoned by the donor shard.
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 7c572a029cd..0d8871a0f45 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -148,6 +148,12 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx,
return &collectionsMap->getOrCreate(nss);
}
+CollectionShardingState* CollectionShardingState::get_UNSAFE(ServiceContext* svcCtx,
+ const NamespaceString& nss) {
+ auto& collectionsMap = CollectionShardingStateMap::get(svcCtx);
+ return &collectionsMap->getOrCreate(nss);
+}
+
void CollectionShardingState::report(OperationContext* opCtx, BSONObjBuilder* builder) {
auto& collectionsMap = CollectionShardingStateMap::get(opCtx->getServiceContext());
collectionsMap->report(opCtx, builder);
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 9991a1811ab..d62b010bec6 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -70,6 +70,12 @@ public:
static CollectionShardingState* get(OperationContext* opCtx, const NamespaceString& nss);
/**
+ * It is the caller's responsibility to ensure that the collection locks for this namespace are
+ * held when this is called. The returned pointer should never be stored.
+ */
+ static CollectionShardingState* get_UNSAFE(ServiceContext* svcCtx, const NamespaceString& nss);
+
+ /**
* Reports all collections which have filtering information associated.
*/
static void report(OperationContext* opCtx, BSONObjBuilder* builder);
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 52c8993163e..c871a3e08a8 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -156,17 +156,6 @@ public:
const repl::OpTime& opTime,
const repl::OpTime& preImageOpTime) = 0;
- /**
- * Notifies this cloner that a transaction involving the collection being cloned was prepared or
- * committed. It is up to the cloner's implementation to decide what to do with this information
- * and it is valid for the implementation to ignore it.
- *
- * NOTE: Must be called with at least IX lock held on the collection.
- */
- virtual void onTransactionPrepareOrUnpreparedCommit(OperationContext* opCtx,
- const repl::OpTime& opTime) = 0;
-
-
protected:
MigrationChunkClonerSource();
};
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index a78553c804e..2c3f23ff2d4 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -43,6 +43,8 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/s/start_chunk_clone_request.h"
#include "mongo/db/service_context.h"
@@ -87,6 +89,34 @@ BSONObj createRequestWithSessionId(StringData commandName,
return builder.obj();
}
+const BSONObj& getDocumentKeyFromReplOperation(repl::ReplOperation replOperation,
+ repl::OpTypeEnum opType) {
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert:
+ case repl::OpTypeEnum::kDelete:
+ return replOperation.getObject();
+ case repl::OpTypeEnum::kUpdate:
+ return *replOperation.getObject2();
+ default:
+ MONGO_UNREACHABLE;
+ }
+ MONGO_UNREACHABLE;
+}
+
+const char getOpCharForCrudOpType(repl::OpTypeEnum opType) {
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert:
+ return 'i';
+ case repl::OpTypeEnum::kUpdate:
+ return 'u';
+ case repl::OpTypeEnum::kDelete:
+ return 'd';
+ default:
+ MONGO_UNREACHABLE;
+ }
+ MONGO_UNREACHABLE;
+}
+
} // namespace
/**
@@ -110,8 +140,7 @@ public:
_prePostImageOpTime(prePostImageOpTime) {}
void commit(boost::optional<Timestamp>) override {
- _cloner->_consumeOperationTrackRequestAndAddToTransferModsQueue(
- _idObj, _op, _opTime, _prePostImageOpTime);
+ _cloner->_addToTransferModsQueue(_idObj, _op, _opTime, _prePostImageOpTime);
_cloner->_decrementOutstandingOperationTrackRequests();
}
@@ -127,30 +156,62 @@ private:
const repl::OpTime _prePostImageOpTime;
};
-/**
- * Used to keep track of new transactions that involve documents in any chunk
- * with an ongoing migration.
- */
-class LogPrepareOrCommitOpForShardingHandler final : public RecoveryUnit::Change {
-public:
- LogPrepareOrCommitOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
- const repl::OpTime& opTime)
- : _cloner(cloner), _opTime(opTime) {}
+void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) {
+ std::set<NamespaceString> namespacesTouchedByTransaction;
- void commit(boost::optional<Timestamp>) override {
- _cloner->_addToSessionMigrationOptimeQueue(
- _opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
- _cloner->_decrementOutstandingOperationTrackRequests();
- }
+ for (const auto& stmt : _stmts) {
+ const auto& nss = stmt.getNss();
- void rollback() override {
- _cloner->_decrementOutstandingOperationTrackRequests();
- }
+ auto csr = CollectionShardingRuntime::get_UNSAFE(_svcCtx, nss);
+ auto msm = MigrationSourceManager::get_UNSAFE(csr);
+ if (!msm) {
+ continue;
+ }
+ auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get());
-private:
- MigrationChunkClonerSourceLegacy* const _cloner;
- const repl::OpTime _opTime;
-};
+ auto opType = stmt.getOpType();
+ auto documentKey = getDocumentKeyFromReplOperation(stmt, opType);
+
+ auto idElement = documentKey["_id"];
+ if (idElement.eoo()) {
+ warning() << "Received a document with no id, ignoring: " << redact(documentKey);
+ continue;
+ }
+
+ auto const& minKey = cloner->_args.getMinKey();
+ auto const& maxKey = cloner->_args.getMaxKey();
+ auto const& shardKeyPattern = cloner->_shardKeyPattern;
+
+ if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) {
+ // If the preImageDoc is not in range but the postImageDoc was, we know that the
+ // document has changed shard keys and no longer belongs in the chunk being cloned.
+ // We will model the deletion of the preImage document so that the destination chunk
+ // does not receive an outdated version of this document.
+ if (opType == repl::OpTypeEnum::kUpdate &&
+ isInRange(stmt.getPreImageDocumentKey(), minKey, maxKey, shardKeyPattern) &&
+ !stmt.getPreImageDocumentKey()["_id"].eoo()) {
+ opType = repl::OpTypeEnum::kDelete;
+ idElement = stmt.getPreImageDocumentKey()["id"];
+ } else {
+ continue;
+ }
+ }
+
+ // Inform the session migration subsystem that a transaction has committed for all involved
+ // namespaces.
+ if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) {
+ cloner->_addToSessionMigrationOptimeQueue(
+ _prepareOrCommitOpTime,
+ SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+
+ namespacesTouchedByTransaction.emplace(nss);
+ }
+
+ // Pass an empty prePostOpTime to the queue because retryable write history doesn't care
+ // about writes in transactions.
+ cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}, {});
+ }
+}
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
const BSONObj& shardKeyPattern,
@@ -456,18 +517,6 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
}
}
-void MigrationChunkClonerSourceLegacy::onTransactionPrepareOrUnpreparedCommit(
- OperationContext* opCtx, const repl::OpTime& opTime) {
-
- invariant(opCtx->getTxnNumber());
-
- if (!_addedOperationToOutstandingOperationTrackRequests()) {
- return;
- }
-
- opCtx->recoveryUnit()->registerChange(new LogPrepareOrCommitOpForShardingHandler(this, opTime));
-}
-
void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue(
const repl::OpTime& opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType) {
@@ -478,7 +527,7 @@ void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue(
}
}
-void MigrationChunkClonerSourceLegacy::_consumeOperationTrackRequestAndAddToTransferModsQueue(
+void MigrationChunkClonerSourceLegacy::_addToTransferModsQueue(
const BSONObj& idObj,
const char op,
const repl::OpTime& opTime,
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index c737716b93f..e77998d907d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -54,6 +54,30 @@ class Collection;
class Database;
class RecordId;
+/**
+ * Used to commit work for LogOpForSharding. Used to keep track of changes in documents that are
+ * part of a chunk being migrated.
+ */
+class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change {
+public:
+ /**
+ * Invariant: idObj should belong to a document that is part of the active chunk being migrated
+ */
+ LogTransactionOperationsForShardingHandler(ServiceContext* svcCtx,
+ const std::vector<repl::ReplOperation>& stmts,
+ const repl::OpTime& prepareOrCommitOpTime)
+ : _svcCtx(svcCtx), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {}
+
+ void commit(boost::optional<Timestamp>) override;
+
+ void rollback() override{};
+
+private:
+ ServiceContext* _svcCtx;
+ std::vector<repl::ReplOperation> _stmts;
+ const repl::OpTime _prepareOrCommitOpTime;
+};
+
class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource {
MigrationChunkClonerSourceLegacy(const MigrationChunkClonerSourceLegacy&) = delete;
MigrationChunkClonerSourceLegacy& operator=(const MigrationChunkClonerSourceLegacy&) = delete;
@@ -91,10 +115,6 @@ public:
const repl::OpTime& opTime,
const repl::OpTime& preImageOpTime) override;
- void onTransactionPrepareOrUnpreparedCommit(OperationContext* opCtx,
- const repl::OpTime& opTime) override;
-
-
// Legacy cloner specific functionality
/**
@@ -183,7 +203,7 @@ public:
private:
friend class LogOpForShardingHandler;
- friend class LogPrepareOrCommitOpForShardingHandler;
+ friend class LogTransactionOperationsForShardingHandler;
// Represents the states in which the cloner can be
enum State { kNew, kCloning, kDone };
@@ -216,18 +236,20 @@ private:
const repl::OpTime& opTime,
SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType);
+ void _addToSessionMigrationOptimeQueueForTransactionCommit(
+ const repl::OpTime& opTime,
+ SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType);
+
/*
- * Consumes the operation track request and appends the relevant document changes to
- * the appropriate internal data structures (known colloquially as the 'transfer mods queue').
- * These structures track document changes that are part of a part of a chunk being migrated.
- * In doing so, this the method also removes the corresponding operation track request from the
- * operation track requests queue.
+ * Appends the relevant document changes to the appropriate internal data structures (known
+ * colloquially as the 'transfer mods queue'). These structures track document changes that are
+ * part of a part of a chunk being migrated. In doing so, this the method also removes the
+ * corresponding operation track request from the operation track requests queue.
*/
- void _consumeOperationTrackRequestAndAddToTransferModsQueue(
- const BSONObj& idObj,
- const char op,
- const repl::OpTime& opTime,
- const repl::OpTime& prePostImageOpTime);
+ void _addToTransferModsQueue(const BSONObj& idObj,
+ const char op,
+ const repl::OpTime& opTime,
+ const repl::OpTime& prePostImageOpTime);
/**
* Adds an operation to the outstanding operation track requests. Returns false if the cloner
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 8935b676e45..ae210f50e5e 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -127,6 +127,10 @@ MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime* c
return msmForCsr(csr);
}
+MigrationSourceManager* MigrationSourceManager::get_UNSAFE(CollectionShardingRuntime* csr) {
+ return msmForCsr(csr);
+}
+
MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
MoveChunkRequest request,
ConnectionString donorConnStr,
@@ -280,8 +284,8 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) {
auto const readConcernArgs = repl::ReadConcernArgs(
replCoord->getMyLastAppliedOpTime(), repl::ReadConcernLevel::kLocalReadConcern);
- uassertStatusOK(
- waitForReadConcern(opCtx, readConcernArgs, false, PrepareConflictBehavior::kEnforce));
+ uassertStatusOK(waitForReadConcern(
+ opCtx, readConcernArgs, false, PrepareConflictBehavior::kIgnoreConflicts));
}
Status startCloneStatus = _cloneDriver->startClone(opCtx);
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index b567b5ad7ce..cf0a14fd9df 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -77,6 +77,11 @@ public:
*/
static MigrationSourceManager* get(CollectionShardingRuntime* csr,
CollectionShardingRuntime::CSRLock& csrLock);
+ /**
+ * It is the caller's responsibility to ensure that the collection locks for this namespace are
+ * held when this is called. The returned pointer should never be stored.
+ */
+ static MigrationSourceManager* get_UNSAFE(CollectionShardingRuntime* csr);
/**
* Instantiates a new migration source manager with the specified migration parameters. Must be
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index dd62c984292..0b3c40b7564 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -103,6 +103,10 @@ public:
return true;
}
+ bool canIgnorePrepareConflicts() const override {
+ return true;
+ }
+
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) const override {
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index 2b2ae6aecdf..1cc5844cde3 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/migration_source_manager.h"
namespace mongo {
@@ -166,43 +167,8 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit(
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) {
- std::set<NamespaceString> namespacesTouchedByTransaction;
-
- for (const auto& stmt : stmts) {
- const auto& nss = stmt.getNss();
-
- invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS));
-
- auto csr = CollectionShardingRuntime::get(opCtx, nss);
- auto csrLock = CollectionShardingRuntime::CSRLock::lock(opCtx, csr);
- auto msm = MigrationSourceManager::get(csr, csrLock);
- if (!msm) {
- continue;
- }
-
- if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) {
- msm->getCloner()->onTransactionPrepareOrUnpreparedCommit(opCtx, prepareOrCommitOptime);
- namespacesTouchedByTransaction.insert(nss);
- }
-
- const auto& opType = stmt.getOpType();
-
- // We pass an empty opTime to observers because retryable write history doesn't care about
- // writes in transactions.
- if (opType == repl::OpTypeEnum::kInsert) {
- msm->getCloner()->onInsertOp(opCtx, stmt.getObject(), {});
- } else if (opType == repl::OpTypeEnum::kUpdate) {
- if (auto updateDoc = stmt.getObject2()) {
- msm->getCloner()->onUpdateOp(
- opCtx, stmt.getPreImageDocumentKey(), *updateDoc, {}, {});
- }
- } else if (opType == repl::OpTypeEnum::kDelete) {
- if (isMigratingWithCSRLock(csr, csrLock, stmt.getObject())) {
- msm->getCloner()->onDeleteOp(
- opCtx, getDocumentKey(opCtx, nss, stmt.getObject()), {}, {});
- }
- }
- }
+ opCtx->recoveryUnit()->registerChange(new LogTransactionOperationsForShardingHandler(
+ opCtx->getServiceContext(), stmts, prepareOrCommitOptime));
}
} // namespace mongo