summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2021-05-20 10:25:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-20 10:47:49 +0000
commit40e1693cb180fd17ec6fa0e1b32acc21769c0b85 (patch)
tree375a1c45870c16b2bbd92b6bd150d00268b0aeb9
parent81bec8d5b1e03b2e31e38e202b65c3df62845a6d (diff)
downloadmongo-40e1693cb180fd17ec6fa0e1b32acc21769c0b85.tar.gz
SERVER-56307 Allow the donor to enter the critical section when the untransferred mods are within a convergence threshold.
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp59
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h3
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp28
-rw-r--r--src/mongo/db/s/start_chunk_clone_request.h3
4 files changed, 86 insertions, 7 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index e02a0ee23e3..d65d7b6854d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -41,11 +41,13 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/start_chunk_clone_request.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
@@ -63,6 +65,12 @@
namespace mongo {
namespace {
+/**
+ * The maximum percentage of untrasferred chunk mods at the end of a catch up iteration
+ * that may be deferred to the next phase of the migration protocol (where new writes get blocked).
+ */
+MONGO_EXPORT_SERVER_PARAMETER(maxCatchUpPercentageBeforeBlockingWrites, int, 10);
+
const char kRecvChunkStatus[] = "_recvChunkStatus";
const char kRecvChunkCommit[] = "_recvChunkCommit";
const char kRecvChunkAbort[] = "_recvChunkAbort";
@@ -296,6 +304,32 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
return Status::OK();
}
+ bool supportsCriticalSectionDuringCatchUp = false;
+ if (auto featureSupportedField =
+ res[StartChunkCloneRequest::kSupportsCriticalSectionDuringCatchUp]) {
+ if (!featureSupportedField.booleanSafe()) {
+ return {ErrorCodes::Error(563070),
+ str::stream()
+ << "Illegal value for "
+ << StartChunkCloneRequest::kSupportsCriticalSectionDuringCatchUp};
+ }
+ supportsCriticalSectionDuringCatchUp = true;
+ }
+
+ if (res["state"].String() == "catchup" && supportsCriticalSectionDuringCatchUp) {
+ int64_t estimatedUntransferredModsSize = _deleted.size() * _averageObjectIdSize +
+ _reload.size() * _averageObjectSizeForCloneLocs;
+ auto estimatedUntransferredChunkPercentage =
+ (std::min(_args.getMaxChunkSizeBytes(), estimatedUntransferredModsSize) * 100) /
+ _args.getMaxChunkSizeBytes();
+ if (estimatedUntransferredChunkPercentage <
+ maxCatchUpPercentageBeforeBlockingWrites.load()) {
+ // The recipient is sufficiently caught-up with the writes on the donor.
+ // Block writes, so that it can drain everything.
+ return Status::OK();
+ }
+ }
+
if (res["state"].String() == "fail") {
return {ErrorCodes::OperationFailed,
str::stream() << "Data transfer error: " << res["errmsg"].str()};
@@ -609,11 +643,11 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
// Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any
// multi-key index prefixed by shard key cannot be multikey over the shard key fields.
- IndexDescriptor* const idx =
+ IndexDescriptor* const shardKeyIdx =
collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx,
_shardKeyPattern.toBSON(),
false); // requireSingleKey
- if (!idx) {
+ if (!shardKeyIdx) {
return {ErrorCodes::IndexNotFound,
str::stream() << "can't find index with prefix " << _shardKeyPattern.toBSON()
<< " in storeCurrentLocs for "
@@ -634,7 +668,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
_deleteNotifyExec = std::move(statusWithDeleteNotificationPlanExecutor.getValue());
// Assume both min and max non-empty, append MinKey's to make them fit chosen index
- const KeyPattern kp(idx->keyPattern());
+ const KeyPattern kp(shardKeyIdx->keyPattern());
BSONObj min = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMinKey(), false));
BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false));
@@ -643,7 +677,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
// being queued and will migrate in the 'transferMods' stage.
auto exec = InternalPlanner::indexScan(opCtx,
collection,
- idx,
+ shardKeyIdx,
min,
max,
BoundInclusion::kIncludeStartKeyOnly,
@@ -698,6 +732,19 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
const uint64_t collectionAverageObjectSize = collection->averageObjectSize(opCtx);
+ uint64_t averageObjectIdSize = 0;
+ const uint64_t defaultObjectIdSize = OID::kOIDSize;
+ if (totalRecs > 0) {
+ const auto indexCatalog = collection->getIndexCatalog();
+ const auto idIdx = indexCatalog->findIdIndex(opCtx);
+ if (!idIdx) {
+ return {ErrorCodes::IndexNotFound,
+ str::stream() << "can't find index '_id' in storeCurrentLocs for "
+ << _args.getNss().ns()};
+ }
+ averageObjectIdSize = indexCatalog->getIndex(idIdx)->getSpaceUsedBytes(opCtx) / totalRecs;
+ }
+
if (isLargeChunk) {
return {
ErrorCodes::ChunkTooBig,
@@ -719,8 +766,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _averageObjectSizeForCloneLocs = collectionAverageObjectSize + 12;
-
+ _averageObjectSizeForCloneLocs = collectionAverageObjectSize + defaultObjectIdSize;
+ _averageObjectIdSize = std::max(averageObjectIdSize, defaultObjectIdSize);
return Status::OK();
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 0e8c53feab1..aa9b5271119 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -252,6 +252,9 @@ private:
// pre-allocation (initial clone).
uint64_t _averageObjectSizeForCloneLocs{0};
+ // The estimated average object _id size during the clone phase.
+ uint64_t _averageObjectIdSize{0};
+
// List of _id of documents that were modified that must be re-cloned (xfer mods)
std::list<BSONObj> _reload;
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index cf0a076e3f4..c2a19f9cf2b 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -69,6 +69,9 @@
#include "mongo/util/scopeguard.h"
namespace mongo {
+
+constexpr StringData StartChunkCloneRequest::kSupportsCriticalSectionDuringCatchUp;
+
namespace {
const auto getMigrationDestinationManager =
@@ -295,6 +298,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b,
b.append("min", _min);
b.append("max", _max);
b.append("shardKeyPattern", _shardKeyPattern);
+ b.append(StartChunkCloneRequest::kSupportsCriticalSectionDuringCatchUp, true);
b.append("state", stateToString(_state));
@@ -462,6 +466,25 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
stdx::unique_lock<stdx::mutex> lock(_mutex);
+ const auto convergenceTimeout =
+ Shard::kDefaultConfigCommandTimeout + Shard::kDefaultConfigCommandTimeout / 4;
+
+ // The donor may have started the commit while the recipient is still busy processing
+ // the last batch of mods sent in the catch up phase. Allow some time for synching up.
+ auto deadline = Date_t::now() + convergenceTimeout;
+
+ while (_state == CATCHUP) {
+ if (stdx::cv_status::timeout ==
+ _stateChangedCV.wait_until(lock, deadline.toSystemTimePoint())) {
+ return {ErrorCodes::CommandFailed,
+ str::stream() << "startCommit timed out waiting for the catch up completion. "
+ << "Sender's session is "
+ << sessionId.toString()
+ << ". Current session is "
+ << (_sessionId ? _sessionId->toString() : "none.")};
+ }
+ }
+
if (_state != STEADY) {
return {ErrorCodes::CommandFailed,
str::stream() << "Migration startCommit attempted when not in STEADY state."
@@ -489,7 +512,9 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
_state = COMMIT_START;
_stateChangedCV.notify_all();
- auto const deadline = Date_t::now() + Seconds(30);
+ // Assigning a timeout slightly higher than the one used for network requests to the config
+ // server. Enough time to retry at least once in case of network failures (SERVER-51397).
+ deadline = Date_t::now() + convergenceTimeout;
while (_sessionId) {
if (stdx::cv_status::timeout ==
_isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) {
@@ -913,6 +938,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
const auto& mods = res.response;
if (mods["size"].number() == 0) {
+ // There are no more pending modifications to be applied. End the catchup phase
break;
}
diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h
index c3d63fc807b..4a3e359372d 100644
--- a/src/mongo/db/s/start_chunk_clone_request.h
+++ b/src/mongo/db/s/start_chunk_clone_request.h
@@ -49,6 +49,9 @@ class StatusWith;
*/
class StartChunkCloneRequest {
public:
+ static constexpr auto kSupportsCriticalSectionDuringCatchUp =
+ "supportsCriticalSectionDuringCatchUp"_sd;
+
/**
* Parses the input command and produces a request corresponding to its arguments.
*/