summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_destination_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp216
1 files changed, 83 insertions, 133 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index dcb35a57af5..f1e0ffd925b 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/move_timing_helper.h"
@@ -59,6 +60,7 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -216,6 +218,24 @@ void MigrationDestinationManager::setState(State newState) {
_state = newState;
}
+void MigrationDestinationManager::setStateFail(std::string msg) {
+ log() << msg;
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _errmsg = std::move(msg);
+ _state = FAIL;
+ }
+}
+
+void MigrationDestinationManager::setStateFailWarn(std::string msg) {
+ warning() << msg;
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _errmsg = std::move(msg);
+ _state = FAIL;
+ }
+}
+
bool MigrationDestinationManager::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _isActive_inlock();
@@ -301,7 +321,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_sessionId = sessionId;
_scopedRegisterReceiveChunk = std::move(scopedRegisterReceiveChunk);
-
// TODO: If we are here, the migrate thread must have completed, otherwise _active above
// would be false, so this would never block. There is no better place with the current
// implementation where to join the thread.
@@ -378,8 +397,9 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
while (_sessionId) {
if (stdx::cv_status::timeout ==
_isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) {
+ _errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString();
_state = FAIL;
- return {ErrorCodes::CommandFailed, "startCommit timed out waiting "};
+ return {ErrorCodes::CommandFailed, _errmsg};
}
}
if (_state != DONE) {
@@ -405,29 +425,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
_migrateDriver(
opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
} catch (std::exception& e) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = e.what();
- }
-
- log() << "migrate failed: " << redact(e.what());
+ setStateFail(str::stream() << "migrate failed: " << redact(e.what()));
} catch (...) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = "UNKNOWN ERROR";
- }
-
- log() << "migrate failed with unknown exception";
+ setStateFail("migrate failed with unknown exception: UNKNOWN ERROR");
}
if (getState() != DONE) {
- // Unprotect the range if needed/possible on unsuccessful TO migration
- Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch);
- if (!status.isOK()) {
- warning() << "Failed to remove pending range" << redact(causedBy(status));
- }
+ _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max));
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -492,10 +496,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
OldClientWriteContext ctx(opCtx, _nss.ns());
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) {
- _errmsg = str::stream() << "Not primary during migration: " << _nss.ns()
- << ": checking if collection exists";
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns()
+ << ": checking if collection exists");
return;
}
@@ -539,18 +541,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
OldClientContext ctx(opCtx, _nss.ns());
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) {
- _errmsg = str::stream() << "Not primary during migration: " << _nss.ns();
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns());
return;
}
Database* db = ctx.db();
Collection* collection = db->getCollection(opCtx, _nss);
if (!collection) {
- _errmsg = str::stream() << "collection dropped during migration: " << _nss.ns();
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "collection dropped during migration: " << _nss.ns());
return;
}
@@ -560,30 +558,27 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
if (!indexSpecs.empty()) {
// Only copy indexes if the collection does not have any documents.
if (collection->numRecords(opCtx) > 0) {
- _errmsg = str::stream() << "aborting migration, shard is missing "
- << indexSpecs.size() << " indexes and "
- << "collection is not empty. Non-trivial "
- << "index creation should be scheduled manually";
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "aborting migration, shard is missing "
+ << indexSpecs.size()
+ << " indexes and "
+ << "collection is not empty. Non-trivial "
+ << "index creation should be scheduled manually");
return;
}
auto indexInfoObjs = indexer.init(indexSpecs);
if (!indexInfoObjs.isOK()) {
- _errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << redact(indexInfoObjs.getStatus());
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "failed to create index before migrating data. "
+ << " error: "
+ << redact(indexInfoObjs.getStatus()));
return;
}
auto status = indexer.insertAllDocumentsInCollection();
if (!status.isOK()) {
- _errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << redact(status);
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "failed to create index before migrating data. "
+ << " error: "
+ << redact(status));
return;
}
@@ -604,30 +599,21 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
{
- // 2. Synchronously delete any data which might have been left orphaned in range
- // being moved
-
- RangeDeleterOptions deleterOptions(
- KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
-
- // No need to wait since all existing cursors will filter out this range when returning
- // the results
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "preCleanup";
-
- if (!getDeleter()->deleteNow(opCtx, deleterOptions, &_errmsg)) {
- warning() << "Failed to queue delete for migrate abort: " << redact(_errmsg);
- setState(FAIL);
+ // 2. Synchronously delete any data which might have been left orphaned in the range
+ // being moved, and wait for completion
+
+ auto footprint = ChunkRange(min, max);
+ Status status = _notePending(opCtx, _nss, epoch, footprint);
+ if (!status.isOK()) {
+ setStateFail(status.reason());
return;
}
- Status status = _notePending(opCtx, _nss, min, max, epoch);
+ _chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
+
+ status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint);
if (!status.isOK()) {
- _errmsg = status.reason();
- setState(FAIL);
+ setStateFail(status.reason());
return;
}
@@ -646,10 +632,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
if (!conn->runCommand("admin",
migrateCloneRequest,
res)) { // gets array of objects to copy, in disk order
- setState(FAIL);
- _errmsg = "_migrateClone failed: ";
- _errmsg += redact(res.toString());
- log() << _errmsg;
+ setStateFail(str::stream() << "_migrateClone failed: " << redact(res.toString()));
conn.done();
return;
}
@@ -736,10 +719,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
while (true) {
BSONObj res;
if (!conn->runCommand("admin", xferModsRequest, res)) {
- setState(FAIL);
- _errmsg = "_transferMods failed: ";
- _errmsg += redact(res);
- log() << "_transferMods failed: " << redact(res);
+ setStateFail(str::stream() << "_transferMods failed: " << redact(res));
conn.done();
return;
}
@@ -772,10 +752,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
if (i == maxIterations) {
- _errmsg = "secondary can't keep up with migrate";
- log() << _errmsg;
+ setStateFail("secondary can't keep up with migrate");
conn.done();
- setState(FAIL);
return;
}
}
@@ -806,9 +784,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
if (t.minutes() >= 600) {
- setState(FAIL);
- _errmsg = "Cannot go to critical section because secondaries cannot keep up";
- log() << _errmsg;
+ setStateFail("Cannot go to critical section because secondaries cannot keep up");
return;
}
}
@@ -831,9 +807,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
BSONObj res;
if (!conn->runCommand("admin", xferModsRequest, res)) {
- log() << "_transferMods failed in STEADY state: " << redact(res);
- _errmsg = res.toString();
- setState(FAIL);
+ setStateFail(str::stream() << "_transferMods failed in STEADY state: "
+ << redact(res));
conn.done();
return;
}
@@ -863,8 +838,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
if (getState() == FAIL) {
- _errmsg = "timed out waiting for commit";
- log() << _errmsg;
+ setStateFail("timed out waiting for commit");
return;
}
@@ -992,12 +966,10 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
}
Status MigrationDestinationManager::_notePending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch) {
+ NamespaceString const& nss,
+ OID const& epoch,
+ ChunkRange const& range) {
AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
-
auto css = CollectionShardingState::get(opCtx, nss);
auto metadata = css->getMetadata();
@@ -1005,66 +977,44 @@ Status MigrationDestinationManager::_notePending(OperationContext* opCtx,
// for checking this here is that in the future we shouldn't have this problem.
if (!metadata || metadata->getCollVersion().epoch() != epoch) {
return {ErrorCodes::StaleShardVersion,
- str::stream() << "could not note chunk [" << min << "," << max << ")"
- << " as pending because the epoch for "
+ str::stream() << "not noting chunk " << redact(range.toString())
+ << " as pending because the epoch of "
<< nss.ns()
- << " has changed from "
- << epoch
- << " to "
- << (metadata ? metadata->getCollVersion().epoch()
- : ChunkVersion::UNSHARDED().epoch())};
+ << " changed"};
}
- css->beginReceive(ChunkRange(min, max));
-
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- invariant(!_chunkMarkedPending);
- _chunkMarkedPending = true;
-
+ // start clearing any leftovers that would be in the new chunk
+ if (!css->beginReceive(range)) {
+ return {ErrorCodes::RangeOverlapConflict,
+ str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString())
+ << " migration aborted; documents in range may still be in use on the"
+ " destination shard."};
+ }
return Status::OK();
}
-Status MigrationDestinationManager::_forgetPending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_chunkMarkedPending) {
- return Status::OK();
- }
+void MigrationDestinationManager::_forgetPending(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OID const& epoch,
+ ChunkRange const& range) {
- _chunkMarkedPending = false;
+ if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.)
+ return; // no documents can have been moved in, so there is nothing to clean up.
}
AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
-
auto css = CollectionShardingState::get(opCtx, nss);
auto metadata = css->getMetadata();
- // This can currently happen because drops aren't synchronized with in-migrations. The idea
- // for checking this here is that in the future we shouldn't have this problem.
+ // This can currently happen because drops aren't synchronized with in-migrations. The idea for
+ // checking this here is that in the future we shouldn't have this problem.
if (!metadata || metadata->getCollVersion().epoch() != epoch) {
- return {ErrorCodes::StaleShardVersion,
- str::stream() << "no need to forget pending chunk "
- << "["
- << min
- << ","
- << max
- << ")"
- << " because the epoch for "
- << nss.ns()
- << " has changed from "
- << epoch
- << " to "
- << (metadata ? metadata->getCollVersion().epoch()
- : ChunkVersion::UNSHARDED().epoch())};
+ log() << "no need to forget pending chunk " << redact(range.toString())
+ << " because the epoch for " << nss.ns() << " changed";
+ return;
}
- css->forgetReceive(ChunkRange(min, max));
-
- return Status::OK();
+ css->forgetReceive(range);
}
} // namespace mongo