summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2018-06-28 15:05:47 -0400
committerBlake Oler <blake.oler@mongodb.com>2018-07-02 11:17:00 -0400
commite117102282d4d8bc0352429cf67b2d01754f4ad1 (patch)
treeeb340ccc51bac2677670dc76e80c7422c1f5b443 /src
parent2f730baf61adae417517c149271576207ce3a210 (diff)
downloadmongo-e117102282d4d8bc0352429cf67b2d01754f4ad1.tar.gz
SERVER-25333 Clean up argument passing in the MigrationDestinationManager
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp139
-rw-r--r--src/mongo/db/s/migration_destination_manager.h45
2 files changed, 72 insertions, 112 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 56addc132a9..674213d8d81 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -344,6 +344,10 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_max = max;
_shardKeyPattern = shardKeyPattern;
+ _epoch = epoch;
+
+ _writeConcern = writeConcern;
+
_chunkMarkedPending = false;
_numCloned = 0;
@@ -364,9 +368,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_sessionMigration =
stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId);
- _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, epoch, writeConcern]() {
- _migrateThread(min, max, shardKeyPattern, epoch, writeConcern);
- });
+ _migrateThreadHandle = stdx::thread([this]() { _migrateThread(); });
return Status::OK();
}
@@ -495,11 +497,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
return Status::OK();
}
-void MigrationDestinationManager::_migrateThread(BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- OID epoch,
- WriteConcernOptions writeConcern) {
+void MigrationDestinationManager::_migrateThread() {
Client::initThread("migrateThread");
auto opCtx = Client::getCurrent()->makeOperationContext();
@@ -509,13 +507,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
}
try {
- _migrateDriver(opCtx.get(), min, max, shardKeyPattern, epoch, writeConcern);
+ _migrateDriver(opCtx.get());
} catch (...) {
_setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
}
if (getState() != DONE && !MONGO_FAIL_POINT(failMigrationLeaveOrphans)) {
- _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max));
+ _forgetPending(opCtx.get(), ChunkRange(_min, _max));
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -524,26 +522,21 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
_isActiveCV.notify_all();
}
-void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
- const OID& epoch,
- const WriteConcernOptions& writeConcern) {
+void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
invariant(isActive());
invariant(_sessionId);
invariant(_scopedReceiveChunk);
- invariant(!min.isEmpty());
- invariant(!max.isEmpty());
+ invariant(!_min.isEmpty());
+ invariant(!_max.isEmpty());
auto const serviceContext = opCtx->getServiceContext();
- log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max)
- << " for collection " << _nss.ns() << " from " << _fromShard << " at epoch "
- << epoch.toString() << " with session id " << *_sessionId;
+ log() << "Starting receiving end of migration of chunk " << redact(_min) << " -> "
+ << redact(_max) << " for collection " << _nss.ns() << " from " << _fromShard
+ << " at epoch " << _epoch.toString() << " with session id " << *_sessionId;
MoveTimingHelper timing(
- opCtx, "to", _nss.ns(), min, max, 6 /* steps */, &_errmsg, ShardId(), ShardId());
+ opCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId());
const auto initialState = getState();
@@ -664,7 +657,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
uassert(ErrorCodes::InvalidUUID,
str::stream()
<< "Cannot receive chunk "
- << ChunkRange(min, max).toString()
+ << ChunkRange(_min, _max).toString()
<< " for collection "
<< _nss.ns()
<< " because we already have an identically named collection with UUID "
@@ -732,8 +725,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 2. Synchronously delete any data which might have been left orphaned in the range
// being moved, and wait for completion
- const ChunkRange footprint(min, max);
- auto notification = _notePending(opCtx, _nss, epoch, footprint);
+ const ChunkRange footprint(_min, _max);
+ auto notification = _notePending(opCtx, footprint);
// Wait for the range deletion to report back
if (!notification.waitStatus(opCtx).isOK()) {
_setStateFail(redact(notification.waitStatus(opCtx).reason()));
@@ -741,7 +734,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
// Wait for any other, overlapping queued deletions to drain
- auto status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint);
+ auto status = CollectionShardingState::waitForClean(opCtx, _nss, _epoch, footprint);
if (!status.isOK()) {
_setStateFail(redact(status.reason()));
return;
@@ -782,9 +775,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
BSONObj localDoc;
if (willOverrideLocalId(opCtx,
_nss,
- min,
- max,
- shardKeyPattern,
+ _min,
+ _max,
+ _shardKeyPattern,
autoColl.getDb(),
docToClone,
&localDoc)) {
@@ -804,12 +797,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
_numCloned++;
_clonedBytes += docToClone.objsize();
}
- if (writeConcern.shouldWaitForOtherNodes()) {
+ if (_writeConcern.shouldWaitForOtherNodes()) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
opCtx,
repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- writeConcern);
+ _writeConcern);
if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
warning() << "secondaryThrottle on, but doc insert timed out; "
"continuing";
@@ -875,7 +868,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
break;
}
- _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied);
+ _applyMigrateOp(opCtx, mods, &lastOpApplied);
const int maxIterations = 3600 * 50;
@@ -888,7 +881,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
return;
}
- if (opReplicatedEnough(opCtx, lastOpApplied, writeConcern))
+ if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern))
break;
if (i > 100) {
@@ -915,7 +908,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
log() << "Waiting for replication to catch up before entering critical section";
auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx, lastOpApplied, writeConcern);
+ opCtx, lastOpApplied, _writeConcern);
uassertStatusOKWithContext(awaitReplicationResult.status,
awaitReplicationResult.status.codeString());
@@ -951,8 +944,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
auto mods = res.response;
- if (mods["size"].number() > 0 &&
- _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied)) {
+ if (mods["size"].number() > 0 && _applyMigrateOp(opCtx, mods, &lastOpApplied)) {
continue;
}
@@ -965,7 +957,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
if (getState() == COMMIT_START && transferAfterCommit == true) {
- if (_flushPendingWrites(opCtx, _nss.ns(), min, max, lastOpApplied, writeConcern)) {
+ if (_flushPendingWrites(opCtx, lastOpApplied)) {
break;
}
}
@@ -997,10 +989,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
const BSONObj& xfer,
repl::OpTime* lastOpApplied) {
invariant(lastOpApplied);
@@ -1011,12 +999,12 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
if (xfer["deleted"].isABSONObj()) {
boost::optional<Helpers::RemoveSaver> rs;
if (serverGlobalParams.moveParanoia) {
- rs.emplace("moveChunk", nss.ns(), "removedDuring");
+ rs.emplace("moveChunk", _nss.ns(), "removedDuring");
}
BSONObjIterator i(xfer["deleted"].Obj());
while (i.more()) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Collection " << _nss.ns()
<< " was dropped in the middle of the migration",
@@ -1026,8 +1014,8 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
// Do not apply delete if doc does not belong to the chunk being migrated
BSONObj fullObj;
- if (Helpers::findById(opCtx, autoColl.getDb(), nss.ns(), id, fullObj)) {
- if (!isInRange(fullObj, min, max, shardKeyPattern)) {
+ if (Helpers::findById(opCtx, autoColl.getDb(), _nss.ns(), id, fullObj)) {
+ if (!isInRange(fullObj, _min, _max, _shardKeyPattern)) {
if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) {
MONGO_UNREACHABLE;
}
@@ -1041,7 +1029,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
deleteObjects(opCtx,
autoColl.getCollection(),
- nss,
+ _nss,
id,
true /* justOne */,
false /* god */,
@@ -1056,7 +1044,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
if (xfer["reload"].isABSONObj()) {
BSONObjIterator i(xfer["reload"].Obj());
while (i.more()) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Collection " << _nss.ns()
<< " was dropped in the middle of the migration",
@@ -1065,7 +1053,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
BSONObj updatedDoc = i.next().Obj();
// do not apply insert/update if doc does not belong to the chunk being migrated
- if (!isInRange(updatedDoc, min, max, shardKeyPattern)) {
+ if (!isInRange(updatedDoc, _min, _max, _shardKeyPattern)) {
if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) {
MONGO_UNREACHABLE;
}
@@ -1074,10 +1062,10 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
BSONObj localDoc;
if (willOverrideLocalId(opCtx,
- nss,
- min,
- max,
- shardKeyPattern,
+ _nss,
+ _min,
+ _max,
+ _shardKeyPattern,
autoColl.getDb(),
updatedDoc,
&localDoc)) {
@@ -1091,7 +1079,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
}
// We are in write lock here, so sure we aren't killing
- Helpers::upsert(opCtx, nss.ns(), updatedDoc, true);
+ Helpers::upsert(opCtx, _nss.ns(), updatedDoc, true);
*lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
didAnything = true;
@@ -1102,41 +1090,35 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
}
bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern) {
- if (!opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) {
+ const repl::OpTime& lastOpApplied) {
+ if (!opReplicatedEnough(opCtx, lastOpApplied, _writeConcern)) {
repl::OpTime op(lastOpApplied);
- OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << ns << "' "
- << redact(min) << " -> " << redact(max) << " waiting for: " << op;
+ OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << _nss.ns()
+ << "' " << redact(_min) << " -> " << redact(_max)
+ << " waiting for: " << op;
return false;
}
- log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << redact(min)
- << " -> " << redact(max);
+ log() << "migrate commit succeeded flushing to secondaries for '" << _nss.ns() << "' "
+ << redact(_min) << " -> " << redact(_max);
return true;
}
CollectionShardingState::CleanupNotification MigrationDestinationManager::_notePending(
- OperationContext* opCtx,
- NamespaceString const& nss,
- OID const& epoch,
- ChunkRange const& range) {
+ OperationContext* opCtx, ChunkRange const& range) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
- auto css = CollectionShardingState::get(opCtx, nss);
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X);
+ auto css = CollectionShardingState::get(opCtx, _nss);
auto metadata = css->getMetadata(opCtx);
// This can currently happen because drops aren't synchronized with in-migrations. The idea for
// checking this here is that in the future we shouldn't have this problem.
- if (!metadata || metadata->getCollVersion().epoch() != epoch) {
+ if (!metadata || metadata->getCollVersion().epoch() != _epoch) {
return Status{ErrorCodes::StaleShardVersion,
str::stream() << "not noting chunk " << redact(range.toString())
<< " as pending because the epoch of "
- << nss.ns()
+ << _nss.ns()
<< " changed"};
}
@@ -1144,31 +1126,28 @@ CollectionShardingState::CleanupNotification MigrationDestinationManager::_noteP
auto notification = css->beginReceive(range);
if (notification.ready() && !notification.waitStatus(opCtx).isOK()) {
return notification.waitStatus(opCtx).withContext(
- str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString())
+ str::stream() << "Collection " << _nss.ns() << " range " << redact(range.toString())
<< " migration aborted");
}
return notification;
}
-void MigrationDestinationManager::_forgetPending(OperationContext* opCtx,
- const NamespaceString& nss,
- OID const& epoch,
- ChunkRange const& range) {
+void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, ChunkRange const& range) {
if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.)
return; // no documents can have been moved in, so there is nothing to clean up.
}
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
- auto css = CollectionShardingState::get(opCtx, nss);
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X);
+ auto css = CollectionShardingState::get(opCtx, _nss);
auto metadata = css->getMetadata(opCtx);
// This can currently happen because drops aren't synchronized with in-migrations. The idea for
// checking this here is that in the future we shouldn't have this problem.
- if (!metadata || metadata->getCollVersion().epoch() != epoch) {
+ if (!metadata || metadata->getCollVersion().epoch() != _epoch) {
log() << "no need to forget pending chunk " << redact(range.toString())
- << " because the epoch for " << nss.ns() << " changed";
+ << " because the epoch for " << _nss.ns() << " changed";
return;
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 7fa514e107c..eedf65b14fb 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -144,33 +144,13 @@ private:
/**
* Thread which drives the migration apply process on the recipient side.
*/
- void _migrateThread(BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- OID epoch,
- WriteConcernOptions writeConcern);
-
- void _migrateDriver(OperationContext* opCtx,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
- const OID& epoch,
- const WriteConcernOptions& writeConcern);
-
- bool _applyMigrateOp(OperationContext* opCtx,
- const NamespaceString& ns,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
- const BSONObj& xfer,
- repl::OpTime* lastOpApplied);
-
- bool _flushPendingWrites(OperationContext* opCtx,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern);
+ void _migrateThread();
+
+ void _migrateDriver(OperationContext* opCtx);
+
+ bool _applyMigrateOp(OperationContext* opCtx, const BSONObj& xfer, repl::OpTime* lastOpApplied);
+
+ bool _flushPendingWrites(OperationContext* opCtx, const repl::OpTime& lastOpApplied);
/**
* Remembers a chunk range between 'min' and 'max' as a range which will have data migrated
@@ -178,16 +158,13 @@ private:
* it schedules deletion of any documents in the range, so that process must be seen to be
* complete before migrating any new documents in.
*/
- CollectionShardingState::CleanupNotification _notePending(OperationContext*,
- NamespaceString const&,
- OID const&,
- ChunkRange const&);
+ CollectionShardingState::CleanupNotification _notePending(OperationContext*, ChunkRange const&);
/**
* Stops tracking a chunk range between 'min' and 'max' that previously was having data
* migrated into it, and schedules deletion of any such documents already migrated in.
*/
- void _forgetPending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&);
+ void _forgetPending(OperationContext*, ChunkRange const&);
/**
* Checks whether the MigrationDestinationManager is currently handling a migration by checking
@@ -217,6 +194,10 @@ private:
BSONObj _max;
BSONObj _shardKeyPattern;
+ OID _epoch;
+
+ WriteConcernOptions _writeConcern;
+
// Set to true once we have accepted the chunk as pending into our metadata. Used so that on
// failure we can perform the appropriate cleanup.
bool _chunkMarkedPending{false};