summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2018-06-28 15:05:47 -0400
committerCheahuychou Mao <cheahuychou.mao@mongodb.com>2018-08-21 14:48:49 -0400
commit8816bdcfe2a6889c40f7130559f246ebe31e3569 (patch)
treed1c19f25ec69d6a750e8e06550daddf94db24e9f
parent32add2ea30a3858c013c38eac49fab5758a62173 (diff)
downloadmongo-8816bdcfe2a6889c40f7130559f246ebe31e3569.tar.gz
SERVER-25333 Clean up argument passing in the MigrationDestinationManager
(cherry picked from commit e117102282d4d8bc0352429cf67b2d01754f4ad1)
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp178
-rw-r--r--src/mongo/db/s/migration_destination_manager.h45
2 files changed, 98 insertions, 125 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index cf6294788f1..8d93049a908 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -345,6 +345,10 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_max = max;
_shardKeyPattern = shardKeyPattern;
+ _epoch = epoch;
+
+ _writeConcern = writeConcern;
+
_chunkMarkedPending = false;
_numCloned = 0;
@@ -365,9 +369,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_sessionMigration =
stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId);
- _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, epoch, writeConcern]() {
- _migrateThread(min, max, shardKeyPattern, epoch, writeConcern);
- });
+ _migrateThreadHandle = stdx::thread([this]() { _migrateThread(); });
return Status::OK();
}
@@ -496,11 +498,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
return Status::OK();
}
-void MigrationDestinationManager::_migrateThread(BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- OID epoch,
- WriteConcernOptions writeConcern) {
+void MigrationDestinationManager::_migrateThread() {
Client::initThread("migrateThread");
auto opCtx = Client::getCurrent()->makeOperationContext();
@@ -510,13 +508,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
}
try {
- _migrateDriver(opCtx.get(), min, max, shardKeyPattern, epoch, writeConcern);
+ _migrateDriver(opCtx.get());
} catch (...) {
_setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
}
if (getState() != DONE && !MONGO_FAIL_POINT(failMigrationLeaveOrphans)) {
- _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max));
+ _forgetPending(opCtx.get(), ChunkRange(_min, _max));
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -525,26 +523,21 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
_isActiveCV.notify_all();
}
-void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
- const OID& epoch,
- const WriteConcernOptions& writeConcern) {
+void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
invariant(isActive());
invariant(_sessionId);
invariant(_scopedReceiveChunk);
- invariant(!min.isEmpty());
- invariant(!max.isEmpty());
+ invariant(!_min.isEmpty());
+ invariant(!_max.isEmpty());
auto const serviceContext = opCtx->getServiceContext();
- log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max)
- << " for collection " << _nss.ns() << " from " << _fromShard << " at epoch "
- << epoch.toString() << " with session id " << *_sessionId;
+ log() << "Starting receiving end of migration of chunk " << redact(_min) << " -> "
+ << redact(_max) << " for collection " << _nss.ns() << " from " << _fromShard
+ << " at epoch " << _epoch.toString() << " with session id " << *_sessionId;
MoveTimingHelper timing(
- opCtx, "to", _nss.ns(), min, max, 6 /* steps */, &_errmsg, ShardId(), ShardId());
+ opCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId());
const auto initialState = getState();
@@ -664,22 +657,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
donorUUID.emplace(UUID::parse(donorOptions));
}
- if (collection->uuid() != donorUUID) {
- _setStateFailWarn(
+ uassert(ErrorCodes::InvalidUUID,
str::stream()
- << "Cannot receive chunk "
- << redact(ChunkRange(min, max).toString())
- << " for collection "
- << _nss.ns()
- << " because we already have an identically named collection with UUID "
- << (collection->uuid() ? collection->uuid()->toString() : "(none)")
- << ", which differs from the donor's UUID "
- << (donorUUID ? donorUUID->toString() : "(none)")
- << ". Manually drop the collection on this shard if it contains data from a "
- "previous incarnation of "
- << _nss.ns());
- return;
- }
+ << "Cannot receive chunk "
+ << ChunkRange(_min, _max).toString()
+ << " for collection "
+ << _nss.ns()
+ << " because we already have an identically named collection with UUID "
+ << (collection->uuid() ? collection->uuid()->toString() : "(none)")
+ << ", which differs from the donor's UUID "
+ << (donorUUID ? donorUUID->toString() : "(none)")
+ << ". Manually drop the collection on this shard if it contains data from "
+ "a previous incarnation of "
+ << _nss.ns(),
+ collection->uuid() == donorUUID);
} else {
// We do not have a collection by this name. Create the collection with the donor's
// options.
@@ -740,8 +731,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 2. Synchronously delete any data which might have been left orphaned in the range
// being moved, and wait for completion
- const ChunkRange footprint(min, max);
- auto notification = _notePending(opCtx, _nss, epoch, footprint);
+ const ChunkRange footprint(_min, _max);
+ auto notification = _notePending(opCtx, footprint);
// Wait for the range deletion to report back
if (!notification.waitStatus(opCtx).isOK()) {
_setStateFail(redact(notification.waitStatus(opCtx).reason()));
@@ -749,7 +740,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
// Wait for any other, overlapping queued deletions to drain
- auto status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint);
+ auto status = CollectionShardingState::waitForClean(opCtx, _nss, _epoch, footprint);
if (!status.isOK()) {
_setStateFail(redact(status.reason()));
return;
@@ -807,12 +798,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
_numCloned += batchNumCloned;
_clonedBytes += batchClonedBytes;
}
- if (writeConcern.shouldWaitForOtherNodes()) {
+ if (_writeConcern.shouldWaitForOtherNodes()) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
opCtx,
repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- writeConcern);
+ _writeConcern);
if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
warning() << "secondaryThrottle on, but doc insert timed out; "
"continuing";
@@ -877,7 +868,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
break;
}
- _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied);
+ _applyMigrateOp(opCtx, mods, &lastOpApplied);
const int maxIterations = 3600 * 50;
@@ -890,7 +881,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
return;
}
- if (opReplicatedEnough(opCtx, lastOpApplied, writeConcern))
+ if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern))
break;
if (i > 100) {
@@ -917,7 +908,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
log() << "Waiting for replication to catch up before entering critical section";
auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx, lastOpApplied, writeConcern);
+ opCtx, lastOpApplied, _writeConcern);
uassertStatusOKWithContext(awaitReplicationResult.status,
awaitReplicationResult.status.codeString());
@@ -953,8 +944,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
auto mods = res.response;
- if (mods["size"].number() > 0 &&
- _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied)) {
+ if (mods["size"].number() > 0 && _applyMigrateOp(opCtx, mods, &lastOpApplied)) {
continue;
}
@@ -967,7 +957,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
if (getState() == COMMIT_START && transferAfterCommit == true) {
- if (_flushPendingWrites(opCtx, _nss.ns(), min, max, lastOpApplied, writeConcern)) {
+ if (_flushPendingWrites(opCtx, lastOpApplied)) {
break;
}
}
@@ -999,10 +989,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
const BSONObj& xfer,
repl::OpTime* lastOpApplied) {
repl::OpTime dummy;
@@ -1013,20 +999,25 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
bool didAnything = false;
if (xfer["deleted"].isABSONObj()) {
- Lock::DBLock dlk(opCtx, nss.db(), MODE_IX);
- Helpers::RemoveSaver rs("moveChunk", nss.ns(), "removedDuring");
+ boost::optional<Helpers::RemoveSaver> rs;
+ if (serverGlobalParams.moveParanoia) {
+ rs.emplace("moveChunk", _nss.ns(), "removedDuring");
+ }
BSONObjIterator i(xfer["deleted"].Obj()); // deleted documents
while (i.more()) {
- Lock::CollectionLock clk(opCtx->lockState(), nss.ns(), MODE_X);
- OldClientContext ctx(opCtx, nss.ns());
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Collection " << _nss.ns()
+ << " was dropped in the middle of the migration",
+ autoColl.getCollection());
BSONObj id = i.next().Obj();
// do not apply delete if doc does not belong to the chunk being migrated
BSONObj fullObj;
- if (Helpers::findById(opCtx, ctx.db(), nss.ns(), id, fullObj)) {
- if (!isInRange(fullObj, min, max, shardKeyPattern)) {
+ if (Helpers::findById(opCtx, autoColl.getDb(), _nss.ns(), id, fullObj)) {
+ if (!isInRange(fullObj, _min, _max, _shardKeyPattern)) {
if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) {
MONGO_UNREACHABLE;
}
@@ -1035,12 +1026,12 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
}
if (serverGlobalParams.moveParanoia) {
- rs.goingToDelete(fullObj).transitional_ignore();
+ rs->goingToDelete(fullObj).transitional_ignore();
}
deleteObjects(opCtx,
- ctx.db() ? ctx.db()->getCollection(opCtx, nss) : nullptr,
- nss,
+ autoColl.getCollection(),
+ _nss,
id,
true /* justOne */,
false /* god */,
@@ -1054,12 +1045,16 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
if (xfer["reload"].isABSONObj()) { // modified documents (insert/update)
BSONObjIterator i(xfer["reload"].Obj());
while (i.more()) {
- OldClientWriteContext cx(opCtx, nss.ns());
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Collection " << _nss.ns()
+ << " was dropped in the middle of the migration",
+ autoColl.getCollection());
BSONObj updatedDoc = i.next().Obj();
// do not apply insert/update if doc does not belong to the chunk being migrated
- if (!isInRange(updatedDoc, min, max, shardKeyPattern)) {
+ if (!isInRange(updatedDoc, _min, _max, _shardKeyPattern)) {
if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) {
MONGO_UNREACHABLE;
}
@@ -1067,8 +1062,14 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
}
BSONObj localDoc;
- if (willOverrideLocalId(
- opCtx, nss, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) {
+ if (willOverrideLocalId(opCtx,
+ _nss,
+ _min,
+ _max,
+ _shardKeyPattern,
+ autoColl.getDb(),
+ updatedDoc,
+ &localDoc)) {
const std::string errMsg = str::stream()
<< "cannot migrate chunk, local document " << redact(localDoc)
<< " has same _id as reloaded remote document " << redact(updatedDoc);
@@ -1079,7 +1080,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
}
// We are in write lock here, so sure we aren't killing
- Helpers::upsert(opCtx, nss.ns(), updatedDoc, true);
+ Helpers::upsert(opCtx, _nss.ns(), updatedDoc, true);
*lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
didAnything = true;
@@ -1090,41 +1091,35 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
}
bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern) {
- if (!opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) {
+ const repl::OpTime& lastOpApplied) {
+ if (!opReplicatedEnough(opCtx, lastOpApplied, _writeConcern)) {
repl::OpTime op(lastOpApplied);
- OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << ns << "' "
- << redact(min) << " -> " << redact(max) << " waiting for: " << op;
+ OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << _nss.ns()
+ << "' " << redact(_min) << " -> " << redact(_max)
+ << " waiting for: " << op;
return false;
}
- log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << redact(min)
- << " -> " << redact(max);
+ log() << "migrate commit succeeded flushing to secondaries for '" << _nss.ns() << "' "
+ << redact(_min) << " -> " << redact(_max);
return true;
}
CollectionShardingState::CleanupNotification MigrationDestinationManager::_notePending(
- OperationContext* opCtx,
- NamespaceString const& nss,
- OID const& epoch,
- ChunkRange const& range) {
+ OperationContext* opCtx, ChunkRange const& range) {
- AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
- auto css = CollectionShardingState::get(opCtx, nss);
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X);
+ auto css = CollectionShardingState::get(opCtx, _nss);
auto metadata = css->getMetadata(opCtx);
// This can currently happen because drops aren't synchronized with in-migrations. The idea for
// checking this here is that in the future we shouldn't have this problem.
- if (!metadata || metadata->getCollVersion().epoch() != epoch) {
+ if (!metadata || metadata->getCollVersion().epoch() != _epoch) {
return Status{ErrorCodes::StaleShardVersion,
str::stream() << "not noting chunk " << redact(range.toString())
<< " as pending because the epoch of "
- << nss.ns()
+ << _nss.ns()
<< " changed"};
}
@@ -1132,31 +1127,28 @@ CollectionShardingState::CleanupNotification MigrationDestinationManager::_noteP
auto notification = css->beginReceive(range);
if (notification.ready() && !notification.waitStatus(opCtx).isOK()) {
return notification.waitStatus(opCtx).withContext(
- str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString())
+ str::stream() << "Collection " << _nss.ns() << " range " << redact(range.toString())
<< " migration aborted");
}
return notification;
}
-void MigrationDestinationManager::_forgetPending(OperationContext* opCtx,
- const NamespaceString& nss,
- OID const& epoch,
- ChunkRange const& range) {
+void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, ChunkRange const& range) {
if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.)
return; // no documents can have been moved in, so there is nothing to clean up.
}
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
- auto css = CollectionShardingState::get(opCtx, nss);
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X);
+ auto css = CollectionShardingState::get(opCtx, _nss);
auto metadata = css->getMetadata(opCtx);
// This can currently happen because drops aren't synchronized with in-migrations. The idea for
// checking this here is that in the future we shouldn't have this problem.
- if (!metadata || metadata->getCollVersion().epoch() != epoch) {
+ if (!metadata || metadata->getCollVersion().epoch() != _epoch) {
log() << "no need to forget pending chunk " << redact(range.toString())
- << " because the epoch for " << nss.ns() << " changed";
+ << " because the epoch for " << _nss.ns() << " changed";
return;
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 3e3e3a4beb8..73d57649bff 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -144,33 +144,13 @@ private:
/**
* Thread which drives the migration apply process on the recipient side.
*/
- void _migrateThread(BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- OID epoch,
- WriteConcernOptions writeConcern);
-
- void _migrateDriver(OperationContext* opCtx,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
- const OID& epoch,
- const WriteConcernOptions& writeConcern);
-
- bool _applyMigrateOp(OperationContext* opCtx,
- const NamespaceString& ns,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern,
- const BSONObj& xfer,
- repl::OpTime* lastOpApplied);
-
- bool _flushPendingWrites(OperationContext* opCtx,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern);
+ void _migrateThread();
+
+ void _migrateDriver(OperationContext* opCtx);
+
+ bool _applyMigrateOp(OperationContext* opCtx, const BSONObj& xfer, repl::OpTime* lastOpApplied);
+
+ bool _flushPendingWrites(OperationContext* opCtx, const repl::OpTime& lastOpApplied);
/**
* Remembers a chunk range between 'min' and 'max' as a range which will have data migrated
@@ -178,16 +158,13 @@ private:
* it schedules deletion of any documents in the range, so that process must be seen to be
* complete before migrating any new documents in.
*/
- CollectionShardingState::CleanupNotification _notePending(OperationContext*,
- NamespaceString const&,
- OID const&,
- ChunkRange const&);
+ CollectionShardingState::CleanupNotification _notePending(OperationContext*, ChunkRange const&);
/**
* Stops tracking a chunk range between 'min' and 'max' that previously was having data
* migrated into it, and schedules deletion of any such documents already migrated in.
*/
- void _forgetPending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&);
+ void _forgetPending(OperationContext*, ChunkRange const&);
/**
* Checks whether the MigrationDestinationManager is currently handling a migration by checking
@@ -217,6 +194,10 @@ private:
BSONObj _max;
BSONObj _shardKeyPattern;
+ OID _epoch;
+
+ WriteConcernOptions _writeConcern;
+
// Set to true once we have accepted the chunk as pending into our metadata. Used so that on
// failure we can perform the appropriate cleanup.
bool _chunkMarkedPending{false};