summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2020-02-21 16:26:45 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-21 23:09:49 +0000
commit616564d1a5ea7217aa8b1e7f4c6d28800f0b65c9 (patch)
treeac5703336b02d40e83f7b14c954b41f6d3ceb1b1 /src/mongo/db/s
parentd6c1d5f25f895cace27e2b165a1bee9e1865a2fd (diff)
downloadmongo-616564d1a5ea7217aa8b1e7f4c6d28800f0b65c9.tar.gz
SERVER-45952 moveChunk command should re-use lsid and use txnNumber that increments by two for each migration
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h3
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp113
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h3
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp56
-rw-r--r--src/mongo/db/s/migration_coordinator.h9
-rw-r--r--src/mongo/db/s/migration_coordinator_document.idl3
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp22
-rw-r--r--src/mongo/db/s/migration_source_manager.h1
-rw-r--r--src/mongo/db/s/migration_util.cpp19
9 files changed, 161 insertions, 68 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 00e17e8fe3a..8d8fd7fa403 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -79,6 +79,9 @@ public:
const LogicalSessionId& lsid,
TxnNumber txnNumber) = 0;
+ // TODO (SERVER-44787): Remove this function after 4.4 is released.
+ virtual Status startClone(OperationContext* opCtx) = 0;
+
/**
* Blocking method, which uses some custom selected logic for deciding whether it is appropriate
* for the donor shard to enter critical section.
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 0b6d87cd791..a3941cfde46 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -44,7 +44,6 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_source_manager.h"
-#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/s/start_chunk_clone_request.h"
#include "mongo/db/service_context.h"
@@ -283,37 +282,91 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx,
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
- auto fcvVersion = serverGlobalParams.featureCompatibility.getVersion();
- if (fcvVersion == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44 &&
- !disableResumableRangeDeleter.load()) {
- StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
- _args.getNss(),
- migrationId,
- lsid,
- txnNumber,
- _sessionId,
- _donorConnStr,
- _args.getFromShardId(),
- _args.getToShardId(),
- _args.getMinKey(),
- _args.getMaxKey(),
- _shardKeyPattern.toBSON(),
- _args.getSecondaryThrottle());
- } else {
- // TODO (SERVER-44787): Remove this overload after 4.4 is released AND
- // disableResumableRangeDeleter has been removed from server parameters.
- StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
- _args.getNss(),
- _sessionId,
- _donorConnStr,
- _args.getFromShardId(),
- _args.getToShardId(),
- _args.getMinKey(),
- _args.getMaxKey(),
- _shardKeyPattern.toBSON(),
- _args.getSecondaryThrottle());
+ StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
+ _args.getNss(),
+ migrationId,
+ lsid,
+ txnNumber,
+ _sessionId,
+ _donorConnStr,
+ _args.getFromShardId(),
+ _args.getToShardId(),
+ _args.getMinKey(),
+ _args.getMaxKey(),
+ _shardKeyPattern.toBSON(),
+ _args.getSecondaryThrottle());
+
+ auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj());
+ if (!startChunkCloneResponseStatus.isOK()) {
+ return startChunkCloneResponseStatus.getStatus();
+ }
+
+ // TODO (Kal): Setting the state to kCloning below means that if cancelClone was called we will
+ // send a cancellation command to the recipient. The reason to limit the cases when we send
+ // cancellation is for backwards compatibility with 3.2 nodes, which cannot differentiate
+ // between cancellations for different migration sessions. It is thus possible that a second
+ // migration from different donor, but the same recipient would certainly abort an already
+ // running migration.
+ stdx::lock_guard<Latch> sl(_mutex);
+ _state = kCloning;
+
+ return Status::OK();
+}
+
+// TODO (SERVER-44787): Remove this overload after 4.4 is released AND
+// disableResumableRangeDeleter has been removed from server parameters.
+Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
+ invariant(_state == kNew);
+ invariant(!opCtx->lockState()->isLocked());
+
+ auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
+ _sessionCatalogSource = std::make_unique<SessionCatalogMigrationSource>(
+ opCtx,
+ _args.getNss(),
+ ChunkRange(_args.getMinKey(), _args.getMaxKey()),
+ _shardKeyPattern.getKeyPattern());
+
+ // Prime up the session migration source if there are oplog entries to migrate.
+ _sessionCatalogSource->fetchNextOplog(opCtx);
}
+ {
+ // Ignore prepare conflicts when we load ids of currently available documents. This is
+ // acceptable because we will track changes made by prepared transactions at transaction
+ // commit time.
+ auto originalPrepareConflictBehavior = opCtx->recoveryUnit()->getPrepareConflictBehavior();
+
+ ON_BLOCK_EXIT([&] {
+ opCtx->recoveryUnit()->setPrepareConflictBehavior(originalPrepareConflictBehavior);
+ });
+
+ opCtx->recoveryUnit()->setPrepareConflictBehavior(
+ PrepareConflictBehavior::kIgnoreConflicts);
+
+ auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx);
+ if (storeCurrentLocsStatus == ErrorCodes::ChunkTooBig && _forceJumbo) {
+ stdx::lock_guard<Latch> sl(_mutex);
+ _jumboChunkCloneState.emplace();
+ } else if (!storeCurrentLocsStatus.isOK()) {
+ return storeCurrentLocsStatus;
+ }
+ }
+
+ // Tell the recipient shard to start cloning
+ BSONObjBuilder cmdBuilder;
+
+ StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
+ _args.getNss(),
+ _sessionId,
+ _donorConnStr,
+ _args.getFromShardId(),
+ _args.getToShardId(),
+ _args.getMinKey(),
+ _args.getMaxKey(),
+ _shardKeyPattern.toBSON(),
+ _args.getSecondaryThrottle());
+
auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj());
if (!startChunkCloneResponseStatus.isOK()) {
return startChunkCloneResponseStatus.getStatus();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 8e34e2033f3..7990615e38f 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -95,6 +95,9 @@ public:
const LogicalSessionId& lsid,
TxnNumber txnNumber) override;
+ // TODO (SERVER-44787): Remove this function after 4.4 is released.
+ Status startClone(OperationContext* opCtx) override;
+
Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx,
Milliseconds maxTimeToWait) override;
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index 1e38cc4060c..58eae0dfe11 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -33,9 +33,11 @@
#include "mongo/db/s/migration_coordinator.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/range_deletion_task_gen.h"
#include "mongo/logv2/log.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/util/fail_point.h"
namespace mongo {
@@ -49,11 +51,23 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeSendingAbortDecision);
MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterCommitDecision);
MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision);
+namespace {
+
+LogicalSessionId getSystemLogicalSessionId() {
+ static auto lsid = makeSystemLogicalSessionId();
+ return lsid;
+}
+
+TxnNumber getNextTxnNumber() {
+ static AtomicWord<TxnNumber> nextTxnNumber{0};
+ return nextTxnNumber.fetchAndAdd(2);
+}
+
+} // namespace
+
namespace migrationutil {
-MigrationCoordinator::MigrationCoordinator(UUID migrationId,
- MigrationSessionId sessionId,
- LogicalSessionId lsid,
+MigrationCoordinator::MigrationCoordinator(MigrationSessionId sessionId,
ShardId donorShard,
ShardId recipientShard,
NamespaceString collectionNamespace,
@@ -61,9 +75,10 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId,
ChunkRange range,
ChunkVersion preMigrationChunkVersion,
bool waitForDelete)
- : _migrationInfo(migrationId,
+ : _migrationInfo(UUID::gen(),
std::move(sessionId),
- std::move(lsid),
+ getSystemLogicalSessionId(),
+ getNextTxnNumber(),
std::move(collectionNamespace),
collectionUuid,
std::move(donorShard),
@@ -72,8 +87,23 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId,
std::move(preMigrationChunkVersion)),
_waitForDelete(waitForDelete) {}
+MigrationCoordinator::MigrationCoordinator(const MigrationCoordinatorDocument& doc)
+ : _migrationInfo(doc) {}
+
MigrationCoordinator::~MigrationCoordinator() = default;
+const UUID& MigrationCoordinator::getMigrationId() const {
+ return _migrationInfo.getId();
+}
+
+const LogicalSessionId& MigrationCoordinator::getLsid() const {
+ return _migrationInfo.getLsid();
+}
+
+TxnNumber MigrationCoordinator::getTxnNumber() const {
+ return _migrationInfo.getTxnNumber();
+}
+
void MigrationCoordinator::startMigration(OperationContext* opCtx) {
LOGV2(
23889, "{logPrefix}Persisting migration coordinator doc", "logPrefix"_attr = _logPrefix());
@@ -154,9 +184,11 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
"logPrefix"_attr = _logPrefix(),
"migrationInfo_getRecipientShardId"_attr = _migrationInfo.getRecipientShardId(),
"migrationInfo_getLsid"_attr = _migrationInfo.getLsid().toBSON(),
- "TxnNumber_1"_attr = TxnNumber{1});
- migrationutil::advanceTransactionOnRecipient(
- opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1});
+ "TxnNumber"_attr = _migrationInfo.getTxnNumber());
+ migrationutil::advanceTransactionOnRecipient(opCtx,
+ _migrationInfo.getRecipientShardId(),
+ _migrationInfo.getLsid(),
+ _migrationInfo.getTxnNumber());
hangBeforeSendingCommitDecision.pauseWhileSet();
@@ -197,9 +229,11 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
"logPrefix"_attr = _logPrefix(),
"migrationInfo_getRecipientShardId"_attr = _migrationInfo.getRecipientShardId(),
"migrationInfo_getLsid"_attr = _migrationInfo.getLsid().toBSON(),
- "TxnNumber_1"_attr = TxnNumber{1});
- migrationutil::advanceTransactionOnRecipient(
- opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1});
+ "TxnNumber"_attr = _migrationInfo.getTxnNumber());
+ migrationutil::advanceTransactionOnRecipient(opCtx,
+ _migrationInfo.getRecipientShardId(),
+ _migrationInfo.getLsid(),
+ _migrationInfo.getTxnNumber());
hangBeforeSendingAbortDecision.pauseWhileSet();
diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h
index 9b2ccf7e672..94ba7461ad2 100644
--- a/src/mongo/db/s/migration_coordinator.h
+++ b/src/mongo/db/s/migration_coordinator.h
@@ -45,9 +45,7 @@ class MigrationCoordinator {
public:
enum class Decision { kAborted, kCommitted };
- MigrationCoordinator(UUID migrationId,
- MigrationSessionId sessionId,
- LogicalSessionId lsid,
+ MigrationCoordinator(MigrationSessionId sessionId,
ShardId donorShard,
ShardId recipientShard,
NamespaceString collectionNamespace,
@@ -55,6 +53,7 @@ public:
ChunkRange range,
ChunkVersion preMigrationChunkVersion,
bool waitForDelete);
+ MigrationCoordinator(const MigrationCoordinatorDocument& doc);
MigrationCoordinator(const MigrationCoordinator&) = delete;
MigrationCoordinator& operator=(const MigrationCoordinator&) = delete;
MigrationCoordinator(MigrationCoordinator&&) = delete;
@@ -62,6 +61,10 @@ public:
~MigrationCoordinator();
+ const UUID& getMigrationId() const;
+ const LogicalSessionId& getLsid() const;
+ TxnNumber getTxnNumber() const;
+
/**
* Initializes persistent state required to ensure that orphaned ranges are properly handled,
* even after failover, by doing the following:
diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl
index 5308a4545ae..028cb194582 100644
--- a/src/mongo/db/s/migration_coordinator_document.idl
+++ b/src/mongo/db/s/migration_coordinator_document.idl
@@ -75,6 +75,9 @@ structs:
lsid:
type: LogicalSessionId
description: "The sessionId to use to communicate with the recipient"
+ txnNumber:
+ type: TxnNumber
+ description: "The last txnNumber used to communicate with the recipient"
nss:
type: namespacestring
description: "The namespace of the collection that the chunk belongs to."
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 77382fb15b6..049609000eb 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -262,9 +262,6 @@ Status MigrationSourceManager::startClone() {
auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
auto replEnabled = replCoord->isReplEnabled();
- UUID migrationId = UUID::gen();
- _lsid = makeLogicalSessionId(_opCtx);
-
{
const auto metadata = _getCurrentMetadataAndCheckEpoch();
@@ -298,9 +295,7 @@ Status MigrationSourceManager::startClone() {
if (_enableResumableRangeDeleter) {
_coordinator = std::make_unique<migrationutil::MigrationCoordinator>(
- migrationId,
_cloneDriver->getSessionId(),
- _lsid,
_args.getFromShardId(),
_args.getToShardId(),
getNss(),
@@ -324,11 +319,19 @@ Status MigrationSourceManager::startClone() {
if (_enableResumableRangeDeleter) {
_coordinator->startMigration(_opCtx);
- }
- Status startCloneStatus = _cloneDriver->startClone(_opCtx, migrationId, _lsid, TxnNumber{0});
- if (!startCloneStatus.isOK()) {
- return startCloneStatus;
+ Status startCloneStatus = _cloneDriver->startClone(_opCtx,
+ _coordinator->getMigrationId(),
+ _coordinator->getLsid(),
+ _coordinator->getTxnNumber());
+ if (!startCloneStatus.isOK()) {
+ return startCloneStatus;
+ }
+ } else {
+ Status startCloneStatus = _cloneDriver->startClone(_opCtx);
+ if (!startCloneStatus.isOK()) {
+ return startCloneStatus;
+ }
}
scopedGuard.dismiss();
@@ -765,6 +768,7 @@ void MigrationSourceManager::_cleanup() {
auto newOpCtxPtr = cc().makeOperationContext();
auto newOpCtx = newOpCtxPtr.get();
_cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx);
+ _coordinator.reset();
}
}
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 09e58eea15e..f335d2d19c9 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -270,7 +270,6 @@ private:
BSONObj _recipientCloneCounts;
boost::optional<CollectionCriticalSection> _critSec;
- LogicalSessionId _lsid;
// Optional future that is populated if the migration succeeds and range deletion is scheduled
// on this node. The future is set when the range deletion completes. Used if the moveChunk was
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 75b0c8a24c2..593e660ff26 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -611,7 +611,7 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
void advanceTransactionOnRecipient(OperationContext* opCtx,
const ShardId& recipientId,
const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
+ TxnNumber currentTxnNumber) {
write_ops::Update updateOp(NamespaceString::kServerConfigurationNamespace);
auto queryFilter = BSON("_id"
<< "migrationCoordinatorStats");
@@ -622,9 +622,9 @@ void advanceTransactionOnRecipient(OperationContext* opCtx,
updateEntry.setUpsert(true);
updateOp.setUpdates({updateEntry});
- auto passthroughFields =
- BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority << "lsid"
- << lsid.toBSON() << "txnNumber" << txnNumber);
+ auto passthroughFields = BSON(WriteConcernOptions::kWriteConcernField
+ << WriteConcernOptions::Majority << "lsid" << lsid.toBSON()
+ << "txnNumber" << currentTxnNumber + 1);
sendToRecipient(opCtx, recipientId, updateOp, passthroughFields);
}
@@ -770,16 +770,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
LOGV2(22039, "Recovering migration {doc}", "doc"_attr = doc.toBSON());
// Create a MigrationCoordinator to complete the coordination.
- MigrationCoordinator coordinator(doc.getId(),
- doc.getMigrationSessionId(),
- doc.getLsid(),
- doc.getDonorShardId(),
- doc.getRecipientShardId(),
- doc.getNss(),
- doc.getCollectionUuid(),
- doc.getRange(),
- doc.getPreMigrationChunkVersion(),
- false /* waitForDelete */);
+ MigrationCoordinator coordinator(doc);
if (doc.getDecision()) {
// The decision is already known.