diff options
-rw-r--r-- | jstests/sharding/bulk_shard_insert.js | 21 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 85 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 15 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager_legacy_commands.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id_test.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 7 |
10 files changed, 114 insertions, 125 deletions
diff --git a/jstests/sharding/bulk_shard_insert.js b/jstests/sharding/bulk_shard_insert.js index 943fe270ba0..ae4626b8fdb 100644 --- a/jstests/sharding/bulk_shard_insert.js +++ b/jstests/sharding/bulk_shard_insert.js @@ -1,5 +1,7 @@ -// Test bulk inserts with sharding +// Test bulk inserts running alonside the auto-balancer. Ensures that they do not conflict with each +// other. (function() { + 'use strict'; // Setup randomized test var seed = new Date().getTime(); @@ -8,7 +10,7 @@ Random.srand(seed); print("Seeded with " + seed); - var st = new ShardingTest({name: jsTestName(), shards: 4, chunkSize: 1}); + var st = new ShardingTest({shards: 4, chunkSize: 1}); // Setup sharded collection var mongos = st.s0; @@ -19,8 +21,7 @@ // Insert lots of bulk documents var numDocs = 1000000; - var bulkSize = Math.floor(Random.rand() * 1000) + 2; - bulkSize = 4000; + var bulkSize = 4000; var docSize = 128; /* bytes */ print("\n\n\nBulk size is " + bulkSize); @@ -63,19 +64,9 @@ st.printShardingStatus(); var count = coll.find().count(); - var itcount = count; // coll.find().itcount() - - print("Inserted " + docsInserted + " count : " + count + " itcount : " + itcount); - - st.startBalancer(); - - var count = coll.find().count(); var itcount = coll.find().itcount(); - print("Inserted " + docsInserted + " count : " + count + " itcount : " + itcount); - - // SERVER-3645 - // assert.eq( docsInserted, count ) assert.eq(docsInserted, itcount); + st.stop(); })(); diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 93f06c813db..ca38749c373 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -56,10 +56,11 @@ StatusWith<ScopedRegisterMigration> ActiveMigrationsRegistry::registerMigration( return {ScopedRegisterMigration(nullptr, false, _activeMoveChunkState->notification)}; } - return {ErrorCodes::ConflictingOperationInProgress, - str::stream() - << "Unable start new migration because this shard is currently donating chunk for " - << _activeMoveChunkState->args.getNss().ns()}; + return { + ErrorCodes::ConflictingOperationInProgress, + str::stream() + << "Unable to start new migration because this shard is currently donating chunk for " + << _activeMoveChunkState->args.getNss().ns()}; } boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveMigrationNss() { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 26c5a8a209d..d73e58cbfe4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -73,10 +73,11 @@ bool isInRange(const BSONObj& obj, return k.woCompare(min) >= 0 && k.woCompare(max) < 0; } -BSONObj createRecvChunkCommitRequest(const NamespaceString& nss, - const MigrationSessionId& sessionId) { +BSONObj createRequestWithSessionId(StringData commandName, + const NamespaceString& nss, + const MigrationSessionId& sessionId) { BSONObjBuilder builder; - builder.append(kRecvChunkCommit, nss.ns()); + builder.append(commandName, nss.ns()); sessionId.append(&builder); return builder.obj(); } @@ -179,7 +180,13 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) { invariant(!txn->lockState()->isLocked()); - auto scopedGuard = MakeGuard([&] { cancelClone(txn); }); + + // TODO (Kal): This can be changed to cancelClone after 3.4 is released. The reason to only do + // internal cleanup in 3.4 is for backwards compatibility with 3.2 nodes, which cannot + // differentiate between cancellations for different migration sessions. It is thus possible + // that a second migration from different donor, but the same recipient would certainly abort an + // already running migration. + auto scopedGuard = MakeGuard([&] { _cleanup(txn); }); // Resolve the donor and recipient shards and their connection string @@ -282,9 +289,17 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( return {ErrorCodes::OperationFailed, "Data transfer error"}; } + auto migrationSessionIdStatus = MigrationSessionId::extractFromBSON(res); + if (!migrationSessionIdStatus.isOK()) { + return {ErrorCodes::OperationIncomplete, + str::stream() << "Unable to retrieve the id of the migration session due to " + << migrationSessionIdStatus.getStatus().toString()}; + } + if (res["ns"].str() != _args.getNss().ns() || res["from"].str() != _donorCS.toString() || !res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 || - !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0) { + !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 || + !_sessionId.matches(migrationSessionIdStatus.getValue())) { // This can happen when the destination aborted the migration and received another // recvChunk before this thread sees the transition to the abort state. This is // currently possible only if multiple migrations are happening at once. This is an @@ -318,7 +333,8 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) { invariant(!_cloneCompleted); } - auto responseStatus = _callRecipient(createRecvChunkCommitRequest(_args.getNss(), _sessionId)); + auto responseStatus = + _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { _cleanup(txn); return Status::OK(); @@ -337,7 +353,7 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) { return; } - _callRecipient(BSON(kRecvChunkAbort << _args.getNss().ns())); + _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId)); _cleanup(txn); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index a23b6d04f3f..dac91d5ba69 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -318,26 +318,37 @@ Status MigrationDestinationManager::start(const string& ns, _migrateThreadHandle.join(); } - _migrateThreadHandle = stdx::thread([this, - ns, - sessionId, - min, - max, - shardKeyPattern, - fromShardConnString, - epoch, - writeConcern]() { - _migrateThread( - ns, sessionId, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); - }); + _migrateThreadHandle = stdx::thread( + [this, ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() { + _migrateThread(ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); + }); return Status::OK(); } -void MigrationDestinationManager::abort() { +bool MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { stdx::lock_guard<stdx::mutex> sl(_mutex); + + if (!_sessionId) { + return false; + } + + if (!_sessionId->matches(sessionId)) { + warning() << "received abort request from a stale session " << sessionId.toString() + << ". Current session is " << _sessionId->toString(); + return false; + } + _state = ABORT; _errmsg = "aborted"; + + return true; +} + +void MigrationDestinationManager::abortWithoutSessionIdCheck() { + stdx::lock_guard<stdx::mutex> sl(_mutex); + _state = ABORT; + _errmsg = "aborted without session id check"; } bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionId) { @@ -382,7 +393,6 @@ bool MigrationDestinationManager::startCommit(const MigrationSessionId& sessionI } void MigrationDestinationManager::_migrateThread(std::string ns, - MigrationSessionId sessionId, BSONObj min, BSONObj max, BSONObj shardKeyPattern, @@ -398,15 +408,8 @@ void MigrationDestinationManager::_migrateThread(std::string ns, } try { - _migrateDriver(opCtx.get(), - ns, - sessionId, - min, - max, - shardKeyPattern, - fromShardConnString, - epoch, - writeConcern); + _migrateDriver( + opCtx.get(), ns, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); } catch (std::exception& e) { { stdx::lock_guard<stdx::mutex> sl(_mutex); @@ -440,7 +443,6 @@ void MigrationDestinationManager::_migrateThread(std::string ns, void MigrationDestinationManager::_migrateDriver(OperationContext* txn, const string& ns, - const MigrationSessionId& sessionId, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, @@ -448,12 +450,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, const OID& epoch, const WriteConcernOptions& writeConcern) { invariant(isActive()); + invariant(_sessionId); invariant(!min.isEmpty()); invariant(!max.isEmpty()); - log() << "starting receiving-end of migration of chunk " << redact(min) << " -> " << redact(max) + log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max) << " for collection " << ns << " from " << fromShardConnString << " at epoch " - << epoch.toString(); + << epoch.toString() << " with session id " << *_sessionId; string errmsg; MoveTimingHelper timing(txn, "to", ns, min, max, 6 /* steps */, &errmsg, ShardId(), ShardId()); @@ -479,6 +482,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, { // 0. copy system.namespaces entry if collection doesn't already exist + OldClientWriteContext ctx(txn, ns); if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { errmsg = str::stream() << "Not primary during migration: " << ns @@ -610,12 +614,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } timing.done(1); - MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep1); } { - // 2. delete any data already in range + // 2. Synchronously delete any data which might have been left orphaned in range being moved RangeDeleterOptions deleterOptions( KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); @@ -641,32 +644,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } timing.done(2); - MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep2); } - State currentState = getState(); - if (currentState == FAIL || currentState == ABORT) { - string errMsg; - RangeDeleterOptions deleterOptions( - KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern)); - deleterOptions.writeConcern = writeConcern; - // No need to wait since all existing cursors will filter out this range when - // returning the results. - deleterOptions.waitForOpenCursors = false; - deleterOptions.fromMigrate = true; - deleterOptions.onlyRemoveOrphanedDocs = true; - - if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) { - warning() << "Failed to queue delete for migrate abort: " << redact(errMsg); - } - } - { // 3. Initial bulk clone setState(CLONE); - const BSONObj migrateCloneRequest = createMigrateCloneRequest(sessionId); + const BSONObj migrateCloneRequest = createMigrateCloneRequest(*_sessionId); while (true) { BSONObj res; @@ -742,7 +727,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } timing.done(3); - MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3); } @@ -750,7 +734,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // secondaries repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - const BSONObj xferModsRequest = createTransferModsRequest(sessionId); + const BSONObj xferModsRequest = createTransferModsRequest(*_sessionId); { // 4. Do bulk of mods @@ -806,7 +790,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } timing.done(4); - MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep4); } @@ -896,13 +879,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } timing.done(5); - MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5); } setState(DONE); timing.done(6); MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep6); + conn.done(); } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 12e437f6ed4..9de1c5027df 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -98,7 +98,18 @@ public: const OID& epoch, const WriteConcernOptions& writeConcern); - void abort(); + /** + * Idempotent method, which causes the current ongoing migration to abort only if it has the + * specified session id, otherwise returns false. If the migration is already aborted, does + * nothing. + */ + bool abort(const MigrationSessionId& sessionId); + + /** + * Same as 'abort' above, but unconditionally aborts the current migration without checking the + * session id. Only used for backwards compatibility. + */ + void abortWithoutSessionIdCheck(); bool startCommit(const MigrationSessionId& sessionId); @@ -107,7 +118,6 @@ private: * Thread which drives the migration apply process on the recipient side. */ void _migrateThread(std::string ns, - MigrationSessionId sessionId, BSONObj min, BSONObj max, BSONObj shardKeyPattern, @@ -117,7 +127,6 @@ private: void _migrateDriver(OperationContext* txn, const std::string& ns, - const MigrationSessionId& sessionId, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 7f4636e03d2..86df3fe2edb 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -93,12 +93,6 @@ public: BSONObjBuilder& result) { ShardingState* const shardingState = ShardingState::get(txn); - // Active state of TO-side migrations (MigrateStatus) is serialized by distributed - // collection lock. - uassert(ErrorCodes::ConflictingOperationInProgress, - "Shard is already serving as a destination for migration", - !shardingState->migrationDestinationManager()->isActive()); - // Pending deletes (for migrations) are serialized by the distributed collection lock, // we are sure we registered a delete for a range *before* we can migrate-in a // subrange. @@ -314,9 +308,22 @@ public: int, string& errmsg, BSONObjBuilder& result) { - ShardingState::get(txn)->migrationDestinationManager()->abort(); - ShardingState::get(txn)->migrationDestinationManager()->report(result); - return true; + auto const mdm = ShardingState::get(txn)->migrationDestinationManager(); + + auto migrationSessionIdStatus(MigrationSessionId::extractFromBSON(cmdObj)); + + if (migrationSessionIdStatus.isOK()) { + const bool ok = mdm->abort(migrationSessionIdStatus.getValue()); + mdm->report(result); + return ok; + } else if (migrationSessionIdStatus == ErrorCodes::NoSuchKey) { + mdm->abortWithoutSessionIdCheck(); + mdm->report(result); + return true; + } + + uassertStatusOK(migrationSessionIdStatus.getStatus()); + MONGO_UNREACHABLE; } } recvChunkAbortCommand; diff --git a/src/mongo/db/s/migration_session_id.cpp b/src/mongo/db/s/migration_session_id.cpp index 0e2234efdd4..4f617ac5df4 100644 --- a/src/mongo/db/s/migration_session_id.cpp +++ b/src/mongo/db/s/migration_session_id.cpp @@ -61,39 +61,26 @@ StatusWith<MigrationSessionId> MigrationSessionId::extractFromBSON(const BSONObj Status status = bsonExtractStringField(obj, kFieldName, &sessionId); if (status.isOK()) { return MigrationSessionId(sessionId); - } else if (status == ErrorCodes::NoSuchKey) { - return MigrationSessionId(); } return status; } -MigrationSessionId::MigrationSessionId() = default; - MigrationSessionId::MigrationSessionId(std::string sessionId) { invariant(!sessionId.empty()); _sessionId = std::move(sessionId); } bool MigrationSessionId::matches(const MigrationSessionId& other) const { - if (_sessionId && other._sessionId) - return *_sessionId == *other._sessionId; - - return !_sessionId && !other._sessionId; + return _sessionId == other._sessionId; } void MigrationSessionId::append(BSONObjBuilder* builder) const { - if (_sessionId) { - builder->append(kFieldName, *_sessionId); - } + builder->append(kFieldName, _sessionId); } std::string MigrationSessionId::toString() const { - return (_sessionId ? *_sessionId : ""); -} - -bool MigrationSessionId::isEmpty() const { - return !_sessionId; + return _sessionId; } } // namespace mongo diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h index a16733e1dbe..0437fb5246a 100644 --- a/src/mongo/db/s/migration_session_id.h +++ b/src/mongo/db/s/migration_session_id.h @@ -56,13 +56,12 @@ public: /** * Extracts the session id from BSON. If the session id is missing from the BSON contents, - * returns an empty MigrationSessionId. + * returns a NoSuchKey error. */ static StatusWith<MigrationSessionId> extractFromBSON(const BSONObj& obj); /** - * Compares two session identifiers. Two idendifiers match if either both are empty (_sessionId - * is not set) or if the session ids match. + * Compares two session identifiers. Two idendifiers match only if they are the same. */ bool matches(const MigrationSessionId& other) const; @@ -73,13 +72,10 @@ public: std::string toString() const; - bool isEmpty() const; - private: - MigrationSessionId(); explicit MigrationSessionId(std::string sessionId); - boost::optional<std::string> _sessionId{boost::none}; + std::string _sessionId; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_session_id_test.cpp b/src/mongo/db/s/migration_session_id_test.cpp index 02625995941..9fb64bd21e4 100644 --- a/src/mongo/db/s/migration_session_id_test.cpp +++ b/src/mongo/db/s/migration_session_id_test.cpp @@ -54,23 +54,23 @@ TEST(MigrationSessionId, GenerateAndExtract) { } TEST(MigrationSessionId, Comparison) { - MigrationSessionId emptySessionId = - assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1))); - MigrationSessionId nonEmptySessionId = + MigrationSessionId sessionId = assertGet(MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId" << "TestSessionID"))); - ASSERT(!emptySessionId.matches(nonEmptySessionId)); - ASSERT(!nonEmptySessionId.matches(emptySessionId)); - MigrationSessionId sessionIdToCompare = assertGet(MigrationSessionId::extractFromBSON(BSON("SomeOtherField" << 1 << "sessionId" << "TestSessionID"))); - ASSERT(nonEmptySessionId.matches(sessionIdToCompare)); - ASSERT(sessionIdToCompare.matches(nonEmptySessionId)); + ASSERT(sessionId.matches(sessionIdToCompare)); + ASSERT(sessionIdToCompare.matches(sessionId)); +} + +TEST(MigrationSessionId, ErrorNoSuchKeyWhenSessionIdIsMissing) { + ASSERT_EQ(ErrorCodes::NoSuchKey, + MigrationSessionId::extractFromBSON(BSON("SomeField" << 1)).getStatus().code()); } -TEST(MigrationSessionId, ErrorWhenTypeIsNotString) { +TEST(MigrationSessionId, ErrorWhenSessionIdTypeIsNotString) { ASSERT_NOT_OK( MigrationSessionId::extractFromBSON(BSON("SomeField" << 1 << "sessionId" << Date_t::now())) .getStatus()); diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 95730271753..2eb14e6ba21 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -733,10 +733,9 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( StatusWith<ScopedRegisterMigration> ShardingState::registerMigration(const MoveChunkRequest& args) { if (_migrationDestManager.isActive()) { - return { - ErrorCodes::ConflictingOperationInProgress, - str::stream() - << "Unable start new migration because this shard is currently receiving a chunk"}; + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to start new migration because this shard is currently " + "receiving a chunk"}; } return _activeMigrationsRegistry.registerMigration(args); |