summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/bulk_shard_insert.js21
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp9
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp30
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp85
-rw-r--r--src/mongo/db/s/migration_destination_manager.h15
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp25
-rw-r--r--src/mongo/db/s/migration_session_id.cpp19
-rw-r--r--src/mongo/db/s/migration_session_id.h10
-rw-r--r--src/mongo/db/s/migration_session_id_test.cpp18
-rw-r--r--src/mongo/db/s/sharding_state.cpp7
10 files changed, 114 insertions, 125 deletions
diff --git a/jstests/sharding/bulk_shard_insert.js b/jstests/sharding/bulk_shard_insert.js
index 943fe270ba0..ae4626b8fdb 100644
--- a/jstests/sharding/bulk_shard_insert.js
+++ b/jstests/sharding/bulk_shard_insert.js
@@ -1,5 +1,7 @@
-// Test bulk inserts with sharding
+// Test bulk inserts running alonside the auto-balancer. Ensures that they do not conflict with each
+// other.
(function() {
+ 'use strict';
// Setup randomized test
var seed = new Date().getTime();
@@ -8,7 +10,7 @@
Random.srand(seed);
print("Seeded with " + seed);
- var st = new ShardingTest({name: jsTestName(), shards: 4, chunkSize: 1});
+ var st = new ShardingTest({shards: 4, chunkSize: 1});
// Setup sharded collection
var mongos = st.s0;
@@ -19,8 +21,7 @@
// Insert lots of bulk documents
var numDocs = 1000000;
- var bulkSize = Math.floor(Random.rand() * 1000) + 2;
- bulkSize = 4000;
+ var bulkSize = 4000;
var docSize = 128; /* bytes */
print("\n\n\nBulk size is " + bulkSize);
@@ -63,19 +64,9 @@
st.printShardingStatus();
var count = coll.find().count();
- var itcount = count; // coll.find().itcount()
-
- print("Inserted " + docsInserted + " count : " + count + " itcount : " + itcount);
-
- st.startBalancer();
-
- var count = coll.find().count();
var itcount = coll.find().itcount();
-
print("Inserted " + docsInserted + " count : " + count + " itcount : " + itcount);
-
- // SERVER-3645
- // assert.eq( docsInserted, count )
assert.eq(docsInserted, itcount);
+ st.stop();
})();
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index 93f06c813db..ca38749c373 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -56,10 +56,11 @@ StatusWith<ScopedRegisterMigration> ActiveMigrationsRegistry::registerMigration(
return {ScopedRegisterMigration(nullptr, false, _activeMoveChunkState->notification)};
}
- return {ErrorCodes::ConflictingOperationInProgress,
- str::stream()
- << "Unable start new migration because this shard is currently donating chunk for "
- << _activeMoveChunkState->args.getNss().ns()};
+ return {
+ ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Unable to start new migration because this shard is currently donating chunk for "
+ << _activeMoveChunkState->args.getNss().ns()};
}
boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveMigrationNss() {
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 26c5a8a209d..d73e58cbfe4 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -73,10 +73,11 @@ bool isInRange(const BSONObj& obj,
return k.woCompare(min) >= 0 && k.woCompare(max) < 0;
}
-BSONObj createRecvChunkCommitRequest(const NamespaceString& nss,
- const MigrationSessionId& sessionId) {
+BSONObj createRequestWithSessionId(StringData commandName,
+ const NamespaceString& nss,
+ const MigrationSessionId& sessionId) {
BSONObjBuilder builder;
- builder.append(kRecvChunkCommit, nss.ns());
+ builder.append(commandName, nss.ns());
sessionId.append(&builder);
return builder.obj();
}
@@ -179,7 +180,13 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) {
invariant(!txn->lockState()->isLocked());
- auto scopedGuard = MakeGuard([&] { cancelClone(txn); });
+
+ // TODO (Kal): This can be changed to cancelClone after 3.4 is released. The reason to only do
+ // internal cleanup in 3.4 is for backwards compatibility with 3.2 nodes, which cannot
+ // differentiate between cancellations for different migration sessions. It is thus possible
+ // that a second migration from different donor, but the same recipient would certainly abort an
+ // already running migration.
+ auto scopedGuard = MakeGuard([&] { _cleanup(txn); });
// Resolve the donor and recipient shards and their connection string
@@ -282,9 +289,17 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
return {ErrorCodes::OperationFailed, "Data transfer error"};
}
+ auto migrationSessionIdStatus = MigrationSessionId::extractFromBSON(res);
+ if (!migrationSessionIdStatus.isOK()) {
+ return {ErrorCodes::OperationIncomplete,
+ str::stream() << "Unable to retrieve the id of the migration session due to "
+ << migrationSessionIdStatus.getStatus().toString()};
+ }
+
if (res["ns"].str() != _args.getNss().ns() || res["from"].str() != _donorCS.toString() ||
!res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 ||
- !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0) {
+ !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 ||
+ !_sessionId.matches(migrationSessionIdStatus.getValue())) {
// This can happen when the destination aborted the migration and received another
// recvChunk before this thread sees the transition to the abort state. This is
// currently possible only if multiple migrations are happening at once. This is an
@@ -318,7 +333,8 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) {
invariant(!_cloneCompleted);
}
- auto responseStatus = _callRecipient(createRecvChunkCommitRequest(_args.getNss(), _sessionId));
+ auto responseStatus =
+ _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(txn);
return Status::OK();
@@ -337,7 +353,7 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) {
return;
}
- _callRecipient(BSON(kRecvChunkAbort << _args.getNss().ns()));
+ _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId));
_cleanup(txn);
}
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index a23b6d04f3f..dac91d5ba69 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -318,26 +318,37 @@ Status MigrationDestinationManager::start(const string& ns,
_migrateThreadHandle.join();
}
- _migrateThreadHandle = stdx::thread([this,
- ns,
- sessionId,
- min,
- max,
- shardKeyPattern,
- fromShardConnString,
- epoch,
- writeConcern]() {
- _migrateThread(
- ns, sessionId, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
- });
+ _migrateThreadHandle = stdx::thread(
+ [this, ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
+ _migrateThread(ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
+ });
return Status::OK();
}
-void MigrationDestinationManager::abort() {
+bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
+
+ if (!_sessionId) {
+ return false;
+ }
+
+ if (!_sessionId->matches(sessionId)) {
+ warning() << "received abort request from a stale session " << sessionId.toString()
+ << ". Current session is " << _sessionId->toString();
+ return false;
+ }
+
_state = ABORT;
_errmsg = "aborted";
+
+ return true;
+}
+
+void MigrationDestinationManager::abortWithoutSessionIdCheck() {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _state = ABORT;
+ _errmsg = "aborted without session id check";
}
bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) {
@@ -382,7 +393,6 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI
}
void MigrationDestinationManager::_migrateThread(std::string ns,
- MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
@@ -398,15 +408,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
}
try {
- _migrateDriver(opCtx.get(),
- ns,
- sessionId,
- min,
- max,
- shardKeyPattern,
- fromShardConnString,
- epoch,
- writeConcern);
+ _migrateDriver(
+ opCtx.get(), ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
} catch (std::exception& e) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -440,7 +443,6 @@ void MigrationDestinationManager::_migrateThread(std::string ns,
void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const string& ns,
- const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
@@ -448,12 +450,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const OID& epoch,
const WriteConcernOptions& writeConcern) {
invariant(isActive());
+ invariant(_sessionId);
invariant(!min.isEmpty());
invariant(!max.isEmpty());
- log() << "starting receiving-end of migration of chunk " << redact(min) << " -> " << redact(max)
+ log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max)
<< " for collection " << ns << " from " << fromShardConnString << " at epoch "
- << epoch.toString();
+ << epoch.toString() << " with session id " << *_sessionId;
string errmsg;
MoveTimingHelper timing(txn, "to", ns, min, max, 6 /* steps */, &errmsg, ShardId(), ShardId());
@@ -479,6 +482,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
{
// 0. copy system.namespaces entry if collection doesn't already exist
+
OldClientWriteContext ctx(txn, ns);
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
errmsg = str::stream() << "Not primary during migration: " << ns
@@ -610,12 +614,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
timing.done(1);
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep1);
}
{
- // 2. delete any data already in range
+ // 2. Synchronously delete any data which might have been left orphaned in range being moved
RangeDeleterOptions deleterOptions(
KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
@@ -641,32 +644,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
timing.done(2);
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep2);
}
- State currentState = getState();
- if (currentState == FAIL || currentState == ABORT) {
- string errMsg;
- RangeDeleterOptions deleterOptions(
- KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
- // No need to wait since all existing cursors will filter out this range when
- // returning the results.
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
-
- if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) {
- warning() << "Failed to queue delete for migrate abort: " << redact(errMsg);
- }
- }
-
{
// 3. Initial bulk clone
setState(CLONE);
- const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId);
+ const BSONObj migrateCloneRequest = createMigrateCloneRequest(*_sessionId);
while (true) {
BSONObj res;
@@ -742,7 +727,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
timing.done(3);
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3);
}
@@ -750,7 +734,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
// secondaries
repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- const BSONObj xferModsRequest = createTransferModsRequest(sessionId);
+ const BSONObj xferModsRequest = createTransferModsRequest(*_sessionId);
{
// 4. Do bulk of mods
@@ -806,7 +790,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
timing.done(4);
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep4);
}
@@ -896,13 +879,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
}
timing.done(5);
-
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5);
}
setState(DONE);
timing.done(6);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep6);
+
conn.done();
}
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 12e437f6ed4..9de1c5027df 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -98,7 +98,18 @@ public:
const OID& epoch,
const WriteConcernOptions& writeConcern);
- void abort();
+ /**
+ * Idempotent method, which causes the current ongoing migration to abort only if it has the
+ * specified session id, otherwise returns false. If the migration is already aborted, does
+ * nothing.
+ */
+ bool abort(const MigrationSessionId& sessionId);
+
+ /**
+ * Same as 'abort' above, but unconditionally aborts the current migration without checking the
+ * session id. Only used for backwards compatibility.
+ */
+ void abortWithoutSessionIdCheck();
bool startCommit(const MigrationSessionId& sessionId);
@@ -107,7 +118,6 @@ private:
* Thread which drives the migration apply process on the recipient side.
*/
void _migrateThread(std::string ns,
- MigrationSessionId sessionId,
BSONObj min,
BSONObj max,
BSONObj shardKeyPattern,
@@ -117,7 +127,6 @@ private:
void _migrateDriver(OperationContext* txn,
const std::string& ns,
- const MigrationSessionId& sessionId,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 7f4636e03d2..86df3fe2edb 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -93,12 +93,6 @@ public:
BSONObjBuilder& result) {
ShardingState* const shardingState = ShardingState::get(txn);
- // Active state of TO-side migrations (MigrateStatus) is serialized by distributed
- // collection lock.
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "Shard is already serving as a destination for migration",
- !shardingState->migrationDestinationManager()->isActive());
-
// Pending deletes (for migrations) are serialized by the distributed collection lock,
// we are sure we registered a delete for a range *before* we can migrate-in a
// subrange.
@@ -314,9 +308,22 @@ public:
int,
string& errmsg,
BSONObjBuilder& result) {
- ShardingState::get(txn)->migrationDestinationManager()->abort();
- ShardingState::get(txn)->migrationDestinationManager()->report(result);
- return true;
+ auto const mdm = ShardingState::get(txn)->migrationDestinationManager();
+
+ auto migrationSessionIdStatus(MigrationSessionId::extractFromBSON(cmdObj));
+
+ if (migrationSessionIdStatus.isOK()) {
+ const bool ok = mdm->abort(migrationSessionIdStatus.getValue());
+ mdm->report(result);
+ return ok;
+ } else if (migrationSessionIdStatus == ErrorCodes::NoSuchKey) {
+ mdm->abortWithoutSessionIdCheck();
+ mdm->report(result);
+ return true;
+ }
+
+ uassertStatusOK(migrationSessionIdStatus.getStatus());
+ MONGO_UNREACHABLE;
}
} recvChunkAbortCommand;
diff --git a/src/mongo/db/s/migration_session_id.cpp b/src/mongo/db/s/migration_session_id.cpp
index 0e2234efdd4..4f617ac5df4 100644
--- a/src/mongo/db/s/migration_session_id.cpp
+++ b/src/mongo/db/s/migration_session_id.cpp
@@ -61,39 +61,26 @@ StatusWith<MigrationSessionId> MigrationSessionId::extractFromBSON(const BSONObj
Status status = bsonExtractStringField(obj, kFieldName, &sessionId);
if (status.isOK()) {
return MigrationSessionId(sessionId);
- } else if (status == ErrorCodes::NoSuchKey) {
- return MigrationSessionId();
}
return status;
}
-MigrationSessionId::MigrationSessionId() = default;
-
MigrationSessionId::MigrationSessionId(std::string sessionId) {
invariant(!sessionId.empty());
_sessionId = std::move(sessionId);
}
bool MigrationSessionId::matches(const MigrationSessionId& other) const {
- if (_sessionId && other._sessionId)
- return *_sessionId == *other._sessionId;
-
- return !_sessionId && !other._sessionId;
+ return _sessionId == other._sessionId;
}
void MigrationSessionId::append(BSONObjBuilder* builder) const {
- if (_sessionId) {
- builder->append(kFieldName, *_sessionId);
- }
+ builder->append(kFieldName, _sessionId);
}
std::string MigrationSessionId::toString() const {
- return (_sessionId ? *_sessionId : "");
-}
-
-bool MigrationSessionId::isEmpty() const {
- return !_sessionId;
+ return _sessionId;
}
} // namespace mongo
diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h
index a16733e1dbe..0437fb5246a 100644
--- a/src/mongo/db/s/migration_session_id.h
+++ b/src/mongo/db/s/migration_session_id.h
@@ -56,13 +56,12 @@ public:
/**
* Extracts the session id from BSON. If the session id is missing from the BSON contents,
- * returns an empty MigrationSessionId.
+ * returns a NoSuchKey error.
*/
static StatusWith<MigrationSessionId> extractFromBSON(const BSONObj& obj);
/**
- * Compares two session identifiers. Two idendifiers match if either both are empty (_sessionId
- * is not set) or if the session ids match.
+ * Compares two session identifiers. Two idendifiers match only if they are the same.
*/
bool matches(const MigrationSessionId& other) const;
@@ -73,13 +72,10 @@ public:
std::string toString() const;
- bool isEmpty() const;
-
private:
- MigrationSessionId();
explicit MigrationSessionId(std::string sessionId);
- boost::optional<std::string> _sessionId{boost::none};
+ std::string _sessionId;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_session_id_test.cpp b/src/mongo/db/s/migration_session_id_test.cpp
index 02625995941..9fb64bd21e4 100644
--- a/src/mongo/db/s/migration_session_id_test.cpp
+++ b/src/mongo/db/s/migration_session_id_test.cpp
@@ -54,23 +54,23 @@ TEST(MigrationSessionId, GenerateAndExtract) {
}
TEST(MigrationSessionId, Comparison) {
- MigrationSessionId emptySessionId =
- assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1)));
- MigrationSessionId nonEmptySessionId =
+ MigrationSessionId sessionId =
assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId"
<< "TestSessionID")));
- ASSERT(!emptySessionId.matches(nonEmptySessionId));
- ASSERT(!nonEmptySessionId.matches(emptySessionId));
-
MigrationSessionId sessionIdToCompare =
assertGet(MigrationSessionId::extractFromBSON(BSON("SomeOtherField" << 1 << "sessionId"
<< "TestSessionID")));
- ASSERT(nonEmptySessionId.matches(sessionIdToCompare));
- ASSERT(sessionIdToCompare.matches(nonEmptySessionId));
+ ASSERT(sessionId.matches(sessionIdToCompare));
+ ASSERT(sessionIdToCompare.matches(sessionId));
+}
+
+TEST(MigrationSessionId, ErrorNoSuchKeyWhenSessionIdIsMissing) {
+ ASSERT_EQ(ErrorCodes::NoSuchKey,
+ MigrationSessionId::extractFromBSON(BSON("SomeField" << 1)).getStatus().code());
}
-TEST(MigrationSessionId, ErrorWhenTypeIsNotString) {
+TEST(MigrationSessionId, ErrorWhenSessionIdTypeIsNotString) {
ASSERT_NOT_OK(
MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId" << Date_t::now()))
.getStatus());
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 95730271753..2eb14e6ba21 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -733,10 +733,9 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata(
StatusWith<ScopedRegisterMigration> ShardingState::registerMigration(const MoveChunkRequest& args) {
if (_migrationDestManager.isActive()) {
- return {
- ErrorCodes::ConflictingOperationInProgress,
- str::stream()
- << "Unable start new migration because this shard is currently receiving a chunk"};
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to start new migration because this shard is currently "
+ "receiving a chunk"};
}
return _activeMigrationsRegistry.registerMigration(args);