summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2020-02-19 18:45:49 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-20 14:19:15 +0000
commit742fa16dc379a281ad3c10e8c196d5ab2de9f70e (patch)
tree9e4c4700e921ca2cab58e9909ec764667f6e6ce9 /src
parent32f47846d78a4fdae9564b7ebb442d53e737d845 (diff)
downloadmongo-742fa16dc379a281ad3c10e8c196d5ab2de9f70e.tar.gz
SERVER-45744 Migration coordinator recovery task should bump the txnNumber on the session used to send _recvChunkStart
create mode 100644 jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h7
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h10
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp17
-rw-r--r--src/mongo/db/s/migration_coordinator.h11
-rw-r--r--src/mongo/db/s/migration_coordinator_document.idl19
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp3
-rw-r--r--src/mongo/db/s/migration_session_id.h9
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp4
-rw-r--r--src/mongo/db/s/migration_util.cpp46
-rw-r--r--src/mongo/db/s/migration_util.h9
10 files changed, 114 insertions, 21 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index cb4ebb53a29..00e17e8fe3a 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -35,6 +35,7 @@
namespace mongo {
class BSONObj;
+class MigrationSessionId;
class OperationContext;
class Status;
class Timestamp;
@@ -161,6 +162,12 @@ public:
const repl::OpTime& opTime,
const repl::OpTime& preImageOpTime) = 0;
+ /**
+ * Returns the migration session id associated with this cloner, so stale sessions can be
+ * disambiguated.
+ */
+ virtual const MigrationSessionId& getSessionId() const = 0;
+
protected:
MigrationChunkClonerSource();
};
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index d5f56e196ed..8e34e2033f3 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -119,16 +119,12 @@ public:
const repl::OpTime& opTime,
const repl::OpTime& preImageOpTime) override;
- // Legacy cloner specific functionality
-
- /**
- * Returns the migration session id associated with this cloner, so stale sessions can be
- * disambiguated.
- */
- const MigrationSessionId& getSessionId() const {
+ const MigrationSessionId& getSessionId() const override {
return _sessionId;
}
+ // Legacy cloner specific functionality
+
/**
* Returns the rollback ID recorded at the beginning of session migration. If the underlying
* SessionCatalogMigrationSource does not exist, that means this node is running as a standalone
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index 597ca085a45..aaacbc145c7 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -52,6 +52,8 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision);
namespace migrationutil {
MigrationCoordinator::MigrationCoordinator(UUID migrationId,
+ MigrationSessionId sessionId,
+ LogicalSessionId lsid,
ShardId donorShard,
ShardId recipientShard,
NamespaceString collectionNamespace,
@@ -60,6 +62,8 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId,
ChunkVersion preMigrationChunkVersion,
bool waitForDelete)
: _migrationInfo(migrationId,
+ std::move(sessionId),
+ std::move(lsid),
std::move(collectionNamespace),
collectionUuid,
std::move(donorShard),
@@ -108,6 +112,7 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat
<< " to self and to recipient";
boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none;
+
switch (*_decision) {
case Decision::kAborted:
_abortMigrationOnDonorAndRecipient(opCtx);
@@ -118,7 +123,9 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat
hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet();
break;
}
+
forgetMigration(opCtx);
+
return cleanupCompleteFuture;
}
@@ -129,6 +136,11 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
LOG(0) << _logPrefix() << "Making commit decision durable";
migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId());
+ LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId()
+ << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1};
+ migrationutil::advanceTransactionOnRecipient(
+ opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1});
+
hangBeforeSendingCommitDecision.pauseWhileSet();
LOG(0) << _logPrefix() << "Deleting range deletion task on recipient";
@@ -156,6 +168,11 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
LOG(0) << _logPrefix() << "Making abort decision durable";
migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId());
+ LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId()
+ << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1};
+ migrationutil::advanceTransactionOnRecipient(
+ opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1});
+
hangBeforeSendingAbortDecision.pauseWhileSet();
LOG(0) << _logPrefix() << "Deleting range deletion task on donor";
diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h
index be7e9074bf0..9b2ccf7e672 100644
--- a/src/mongo/db/s/migration_coordinator.h
+++ b/src/mongo/db/s/migration_coordinator.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/logical_session_id.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_coordinator_document_gen.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -45,6 +46,8 @@ public:
enum class Decision { kAborted, kCommitted };
MigrationCoordinator(UUID migrationId,
+ MigrationSessionId sessionId,
+ LogicalSessionId lsid,
ShardId donorShard,
ShardId recipientShard,
NamespaceString collectionNamespace,
@@ -63,10 +66,10 @@ public:
* Initializes persistent state required to ensure that orphaned ranges are properly handled,
* even after failover, by doing the following:
*
- * 1) Inserts a document into the local config.migrationCoordinators with the recipientId and
- * waits for majority writeConcern. 2) Inserts a document into the local config.rangeDeletions
- * with the collectionUUID, range to delete, and "pending: true" and waits for majority
- * writeConcern.
+ * 1) Inserts a document into the local config.migrationCoordinators with the lsid and
+ * recipientId and waits for majority writeConcern. 2) Inserts a document into the local
+ * config.rangeDeletions with the collectionUUID, range to delete, and "pending: true" and waits
+ * for majority writeConcern.
*/
void startMigration(OperationContext* opCtx);
diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl
index c1001c0c748..5308a4545ae 100644
--- a/src/mongo/db/s/migration_coordinator_document.idl
+++ b/src/mongo/db/s/migration_coordinator_document.idl
@@ -31,12 +31,15 @@
global:
cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/db/s/migration_session_id.h"
imports:
- "mongo/idl/basic_types.idl"
- "mongo/s/sharding_types.idl"
- "mongo/s/chunk_range.idl"
- "mongo/s/chunk_version.idl"
+ - "mongo/db/logical_session_id.idl"
enums:
Decision:
@@ -46,6 +49,16 @@ enums:
kCommitted: "committed"
kAborted: "aborted"
+types:
+ MigrationSessionId:
+ bson_serialization_type: string
+ description: "The migration session id is the legacy unique identifier for a particular
+ moveChunk command and is exchanged as part of all communication between the
+ donor and recipient shards."
+ cpp_type: "mongo::MigrationSessionId"
+ deserializer: "mongo::MigrationSessionId::fromString"
+ serializer: "mongo::MigrationSessionId::toString"
+
structs:
migrationCoordinatorDocument:
description: "Represents an in-progress migration on the migration donor."
@@ -56,6 +69,12 @@ structs:
type: uuid
description: "A unique identifier for the migration."
cpp_name: id
+ migrationSessionId:
+ type: MigrationSessionId
+ description: "A legacy unique identifier for the migration session."
+ lsid:
+ type: LogicalSessionId
+ description: "The sessionId to use to communicate with the recipient"
nss:
type: namespacestring
description: "The namespace of the collection that the chunk belongs to."
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 8124f36a16f..48edd0d463c 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -218,6 +218,7 @@ MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep6);
MONGO_FAIL_POINT_DEFINE(failMigrationOnRecipient);
MONGO_FAIL_POINT_DEFINE(failMigrationReceivedOutOfRangeOperation);
+MONGO_FAIL_POINT_DEFINE(hangOnRecipientFailure);
} // namespace
@@ -798,6 +799,8 @@ void MigrationDestinationManager::_migrateThread() {
_migrateDriver(opCtx);
}
} catch (...) {
+ log() << "In catch handler";
+ hangOnRecipientFailure.pauseWhileSet();
_setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
}
diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h
index 923427822de..e3ad9deebdc 100644
--- a/src/mongo/db/s/migration_session_id.h
+++ b/src/mongo/db/s/migration_session_id.h
@@ -44,10 +44,12 @@ class StatusWith;
/**
* Encapsulates the logic for generating, parsing and comparing migration sessions. The migration
* session id is a unique identifier for a particular moveChunk command and is exchanged as part of
- * all communication between the source and donor shards.
+ * all communication between the donor and recipient shards.
*/
class MigrationSessionId {
public:
+ MigrationSessionId() = default;
+
/**
* Constructs a new migration session identifier with the following format:
* DonorId_RecipientId_UniqueIdentifier
@@ -78,6 +80,11 @@ public:
std::string toString() const;
+ static MigrationSessionId fromString(StringData sessionId) {
+ MigrationSessionId id(sessionId.toString());
+ return id;
+ }
+
private:
explicit MigrationSessionId(std::string sessionId);
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 1e7319a92e9..505622e074d 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -300,6 +300,8 @@ Status MigrationSourceManager::startClone() {
if (_enableResumableRangeDeleter) {
_coordinator = std::make_unique<migrationutil::MigrationCoordinator>(
migrationId,
+ _cloneDriver->getSessionId(),
+ _lsid,
_args.getFromShardId(),
_args.getToShardId(),
getNss(),
@@ -765,8 +767,6 @@ void MigrationSourceManager::_cleanup() {
auto newOpCtx = newOpCtxPtr.get();
_cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx);
}
-
- LogicalSessionCache::get(_opCtx)->endSessions({_lsid});
}
_state = kDone;
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 770dbdd0ea9..c7eb2183d85 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -83,17 +83,21 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::kNoTimeout);
template <typename Cmd>
-void sendToRecipient(OperationContext* opCtx, const ShardId& recipientId, const Cmd& cmd) {
+void sendToRecipient(OperationContext* opCtx,
+ const ShardId& recipientId,
+ const Cmd& cmd,
+ const BSONObj& passthroughFields = {}) {
auto recipientShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, recipientId));
- LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmd.toBSON({}));
+ auto cmdBSON = cmd.toBSON(passthroughFields);
+ LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmdBSON);
auto response = recipientShard->runCommandWithFixedRetryAttempts(
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "config",
- cmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)),
+ cmd.getDbName().toString(),
+ cmdBSON,
Shard::RetryPolicy::kIdempotent);
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response));
@@ -416,7 +420,10 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
false /*multi*/);
deleteOp.setDeletes({query});
- sendToRecipient(opCtx, recipientId, deleteOp);
+ sendToRecipient(opCtx,
+ recipientId,
+ deleteOp,
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
}
void deleteRangeDeletionTaskLocally(OperationContext* opCtx,
@@ -444,7 +451,30 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
updateEntry.setUpsert(false);
updateOp.setUpdates({updateEntry});
- sendToRecipient(opCtx, recipientId, updateOp);
+ sendToRecipient(opCtx,
+ recipientId,
+ updateOp,
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+}
+
+void advanceTransactionOnRecipient(OperationContext* opCtx,
+ const ShardId& recipientId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
+ write_ops::Update updateOp(NamespaceString::kServerConfigurationNamespace);
+ auto queryFilter = BSON("_id"
+ << "migrationCoordinatorStats");
+ auto updateModification = write_ops::UpdateModification(BSON("$inc" << BSON("count" << 1)));
+
+ write_ops::UpdateOpEntry updateEntry(queryFilter, updateModification);
+ updateEntry.setMulti(false);
+ updateEntry.setUpsert(true);
+ updateOp.setUpdates({updateEntry});
+
+ auto passthroughFields =
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority << "lsid"
+ << lsid.toBSON() << "txnNumber" << txnNumber);
+ sendToRecipient(opCtx, recipientId, updateOp, passthroughFields);
}
void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& migrationId) {
@@ -610,7 +640,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
// Wait for the latest OpTime to be majority committed to ensure any decision that is
// read is on the true branch of history.
// Note (Esha): I don't think this is strictly required for correctness, but it is
- // is difficult to reason about, and being pessimistic by waiting for the decision to be
+ // difficult to reason about, and being pessimistic by waiting for the decision to be
// majority committed does not cost much, since stepup should be rare. It *is* required
// that this node ensure a decision that it itself recovers is majority committed. For
// example, it is possible that this node is a stale primary, and the true primary has
@@ -647,6 +677,8 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
// Create a MigrationCoordinator to complete the coordination.
MigrationCoordinator coordinator(doc.getId(),
+ doc.getMigrationSessionId(),
+ doc.getLsid(),
doc.getDonorShardId(),
doc.getRecipientShardId(),
doc.getNss(),
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index 1b47486159f..951b55e305e 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -161,6 +161,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
const UUID& migrationId);
/**
+ * Advances the optime for the current transaction by performing a write operation as a retryable
+ * write. This is to prevent a write of the deletion task once the decision has been recorded.
+ */
+void advanceTransactionOnRecipient(OperationContext* opCtx,
+ const ShardId& recipientId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber);
+
+/**
* Removes the 'pending' flag from the range deletion task document with the specified id from
* config.rangeDeletions and waits for majority write concern. This marks the range as ready for
* deletion.