diff options
author | Greg McKeon <greg.mckeon@mongodb.com> | 2017-09-27 14:32:40 -0400 |
---|---|---|
committer | Greg McKeon <greg.mckeon@mongodb.com> | 2017-09-27 14:32:40 -0400 |
commit | 522f7f7d36a4a71059dd2d5219c2a0f074dfd0a1 (patch) | |
tree | 89a030c9bfca2b0cab00dbd5e2051dc550d43cd1 /src/mongo/db/s | |
parent | 790cd20518f4aeef78780293b15842c32e7e4b55 (diff) | |
download | mongo-522f7f7d36a4a71059dd2d5219c2a0f074dfd0a1.tar.gz |
Revert "SERVER-30894 Implement command for transferring session information during migration"
This reverts commit 85d9721c00d7020af78fe60453f8362380fe697d.
Diffstat (limited to 'src/mongo/db/s')
16 files changed, 78 insertions, 351 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 575c442ac74..9e6477c1d4b 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -267,9 +267,7 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co return _metadataManager->getNextOrphanRange(from); } -void CollectionShardingState::onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) { +void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -293,16 +291,14 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs); + _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc); } } void CollectionShardingState::onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) { + const BSONObj& updatedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -319,7 +315,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs); + _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc); } } @@ -328,10 +324,7 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState _sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)}; } -void CollectionShardingState::onDeleteOp(OperationContext* opCtx, - const DeleteState& deleteState, - const Timestamp& oplogTs, - const Timestamp& preImageTs) { +void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -372,7 +365,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { - _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs); + _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey); } } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 6f9722dceb9..5aa44336f70 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -49,7 +49,6 @@ struct ChunkVersion; class CollectionMetadata; class MigrationSourceManager; class OperationContext; -class Timestamp; /** * Contains all sharding-related runtime state for a given collection. One such object is assigned @@ -224,17 +223,12 @@ public: * * The global exclusive lock is expected to be held by the caller of any of these functions. */ - void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs); + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc); void onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs); - void onDeleteOp(OperationContext* opCtx, - const DeleteState& deleteState, - const Timestamp& oplogTs, - const Timestamp& preImageTs); + const BSONObj& updatedDoc); + void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState); void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); private: diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 9ac9f4a9702..844727e72c4 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { { WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); } @@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); - ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}), + ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()), AssertionException); } @@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot NamespaceString::kServerConfigurationNamespace); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {}); + collShardingState.onInsertOp(operationContext(), BSON("_id" << 1)); ASSERT_EQ(0, getInitCallCount()); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 9d9a1407f1d..750bcdcda80 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -36,7 +36,6 @@ namespace mongo { class BSONObj; class OperationContext; class Status; -class Timestamp; /** * This class is responsible for producing chunk documents to be moved from donor to a recipient @@ -119,9 +118,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) = 0; + virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0; /** * Notifies this cloner that an update happened to the collection, which it owns. It is up to @@ -130,10 +127,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) = 0; + virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0; /** * Notifies this cloner that a delede happened to the collection, which it owns. It is up to the @@ -142,10 +136,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) = 0; + virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0; protected: MigrationChunkClonerSource(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 65a75e4c9e0..70a76e5e473 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -135,14 +135,8 @@ public: */ LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, const BSONObj& idObj, - const char op, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) - : _cloner(cloner), - _idObj(idObj.getOwned()), - _op(op), - _oplogTs(oplogTs), - _prePostImageTs(prePostImageTs) {} + const char op) + : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {} void commit() override { switch (_op) { @@ -162,14 +156,6 @@ public: default: MONGO_UNREACHABLE; } - - if (!_prePostImageTs.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs); - } - - if (!_oplogTs.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs); - } } void rollback() override {} @@ -178,8 +164,6 @@ private: MigrationChunkClonerSourceLegacy* const _cloner; const BSONObj _idObj; const char _op; - const Timestamp _oplogTs; - const Timestamp _prePostImageTs; }; MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, @@ -191,8 +175,7 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), _args.getToShardId().toString())), _donorConnStr(std::move(donorConnStr)), - _recipientHost(std::move(recipientHost)), - _sessionCatalogSource(_args.getNss()) {} + _recipientHost(std::move(recipientHost)) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -209,9 +192,6 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { return storeCurrentLocsStatus; } - // Prime up the session migration source if there are oplog entries to migrate. - _sessionCatalogSource.fetchNextOplog(opCtx); - // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; StartChunkCloneRequest::appendAsCommand(&cmdBuilder, @@ -334,12 +314,6 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { _cleanup(opCtx); - - if (_sessionCatalogSource.hasMoreOplog()) { - return {ErrorCodes::SessionTransferIncomplete, - "destination shard finished committing but there are still some session " - "metadata that needs to be transferred"}; - } return Status::OK(); } @@ -370,8 +344,7 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& } void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) { + const BSONObj& insertedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; @@ -385,14 +358,11 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {})); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); } void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) { + const BSONObj& updatedDoc) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; @@ -406,14 +376,11 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs)); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); } void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) { + const BSONObj& deletedDocId) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; @@ -423,8 +390,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange( - new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs)); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -713,27 +679,4 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, arr.done(); } -void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx, - BSONArrayBuilder* arrBuilder) { - while (_sessionCatalogSource.hasMoreOplog()) { - auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog(); - - if (oplogDoc.isEmpty()) { - // Last fetched turned out empty, try to see if there are more - _sessionCatalogSource.fetchNextOplog(opCtx); - continue; - } - - // Use the builder size instead of accumulating the document sizes directly so that we - // take into consideration the overhead of BSONArray indices. - if (arrBuilder->arrSize() && - (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) { - break; - } - - arrBuilder->append(oplogDoc); - _sessionCatalogSource.fetchNextOplog(opCtx); - } -} - } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 72d299b58e4..c072718475a 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -37,7 +37,6 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" -#include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/s/move_chunk_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/memory.h" @@ -73,19 +72,11 @@ public: bool isDocumentInMigratingChunk(const BSONObj& doc) override; - void onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc, - const Timestamp& oplogTs) override; + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override; - void onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc, - const Timestamp& oplogTs, - const Timestamp& prePostImageTs) override; + void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override; - void onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId, - const Timestamp& oplogTs, - const Timestamp& preImageTs) override; + void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override; // Legacy cloner specific functionality @@ -130,8 +121,6 @@ public: */ Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder); - void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); - private: friend class DeleteNotificationStage; friend class LogOpForShardingHandler; @@ -194,8 +183,6 @@ private: // during the cloning stage std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec; - SessionCatalogMigrationSource _sessionCatalogSource; - // Protects the entries below stdx::mutex _mutex; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index d42c6bdaa99..9e3e83774ee 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -216,57 +216,5 @@ public: } transferModsCommand; -/** - * Command for extracting the oplog entries that needs to be migrated for the given migration - * session id. - * Note: this command is not stateless. Calling this command has a side-effect of gradually - * depleting the buffer that contains the oplog entries to be transfered. - */ -class MigrateSessionCommand : public BasicCommand { -public: - MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {} - - void help(std::stringstream& h) const { - h << "internal"; - } - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } - - virtual bool slaveOk() const { - return false; - } - - virtual bool adminOnly() const { - return true; - } - - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); - } - - bool run(OperationContext* opCtx, - const std::string&, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - const MigrationSessionId migrationSessionId( - uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); - - BSONArrayBuilder arrBuilder; - - AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); - autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder); - - result.appendArray("oplog", arrBuilder.arr()); - return true; - } - -} migrateSessionCommand; - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index a15f6487a33..193c237db5c 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { WriteUnitOfWork wuow(operationContext()); - cloner.onInsertOp(operationContext(), createCollectionDocument(90), {}); - cloner.onInsertOp(operationContext(), createCollectionDocument(150), {}); - cloner.onInsertOp(operationContext(), createCollectionDocument(151), {}); - cloner.onInsertOp(operationContext(), createCollectionDocument(210), {}); - - cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {}); - cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {}); - cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(90)); + cloner.onInsertOp(operationContext(), createCollectionDocument(150)); + cloner.onInsertOp(operationContext(), createCollectionDocument(151)); + cloner.onInsertOp(operationContext(), createCollectionDocument(210)); + + cloner.onDeleteOp(operationContext(), createCollectionDocument(80)); + cloner.onDeleteOp(operationContext(), createCollectionDocument(199)); + cloner.onDeleteOp(operationContext(), createCollectionDocument(220)); wuow.commit(); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 86688ba08ca..ca2c736a6b1 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -57,8 +57,6 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" #include "mongo/util/concurrency/notification.h" @@ -233,8 +231,6 @@ void MigrationDestinationManager::setStateFail(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } - - _sessionMigration->forceFail(msg); } void MigrationDestinationManager::setStateFailWarn(std::string msg) { @@ -244,8 +240,6 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } - - _sessionMigration->forceFail(msg); } bool MigrationDestinationManager::isActive() const { @@ -340,9 +334,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, _migrateThreadHandle.join(); } - _sessionMigration = - stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId); - _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() { _migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); @@ -406,7 +397,6 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio << _sessionId->toString()}; } - _sessionMigration->finish(); _state = COMMIT_START; auto const deadline = Date_t::now() + Seconds(30); @@ -421,7 +411,6 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio if (_state != DONE) { return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"}; } - return Status::OK(); } @@ -683,8 +672,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 3. Initial bulk clone setState(CLONE); - _sessionMigration->start(opCtx->getServiceContext()); - const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId); _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. @@ -842,15 +829,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, break; } - opCtx->sleepFor(Seconds(1)); + sleepsecs(1); } if (t.minutes() >= 600) { setStateFail("Cannot go to critical section because secondaries cannot keep up"); return; } - - _sessionMigration->waitUntilReadyToCommit(opCtx); } { @@ -910,12 +895,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5); } - _sessionMigration->join(); - if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) { - setStateFail(_sessionMigration->getErrMsg()); - return; - } - setState(DONE); timing.done(6); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 49c6f1e5089..610f70196c1 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -39,7 +39,6 @@ #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_session_id.h" -#include "mongo/db/s/session_catalog_migration_destination.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" @@ -212,8 +211,6 @@ private: State _state{READY}; std::string _errmsg; - - std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration; }; } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 1c4c250dac1..c9344031555 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -201,6 +201,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, */ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const BSONObj& oplogBSON, + // const Timestamp& prePostImageTs, const ProcessOplogResult& lastResult) { ProcessOplogResult result; auto oplogEntry = parseOplog(oplogBSON); @@ -239,62 +240,50 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); result.sessionId = sessionInfo.getSessionId().value(); result.txnNum = sessionInfo.getTxnNumber().value(); - const auto stmtId = *oplogEntry.getStatementId(); auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); - if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { - return lastResult; - } - BSONObj object(result.isPrePostImage ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum); - writeConflictRetry( - opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - // Need to take global lock here so repl::logOp will not unlock it and trigger the - // invariant that disallows unlocking global lock while inside a WUOW. - // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush - // lock will be lock/unlocked correctly. Take the transaction table db lock to - // ensure the same lock ordering with normal replicated updates to the table. - Lock::DBLock lk( - opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); - WriteUnitOfWork wunit(opCtx); - - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNamespace(), - oplogEntry.getUuid(), - object, - &object2, - true, - sessionInfo, - stmtId, - oplogLink); - - auto oplogTs = result.oplogTime.getTimestamp(); - uassert(40633, - str::stream() << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() - << ": " - << redact(oplogBSON), - !oplogTs.isNull()); - - // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because - // the next oplog will contain the real operation. - if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs); - } - - wunit.commit(); - }); + writeConflictRetry(opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + sessionInfo, + *oplogEntry.getStatementId(), + oplogLink); + + auto oplogTs = result.oplogTime.getTimestamp(); + uassert(40633, + str::stream() + << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogTs.isNull()); + + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary( + opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs); + } + + wunit.commit(); + }); return result; } @@ -318,7 +307,6 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_state == State::NotStarted); _state = State::Migrating; - _isStateChanged.notify_all(); } _thread = stdx::thread(stdx::bind( @@ -328,7 +316,6 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { void SessionCatalogMigrationDestination::finish() { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Committing; - _isStateChanged.notify_all(); } void SessionCatalogMigrationDestination::join() { @@ -358,7 +345,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // Timestamp prePostImageTs; ProcessOplogResult lastResult; - repl::OpTime lastOpTimeWaited; while (true) { { @@ -395,17 +381,8 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // Note: only transition to "ready to commit" if state is not error/force stop. if (_state == State::Migrating) { _state = State::ReadyToCommit; - _isStateChanged.notify_all(); } } - - if (lastOpTimeWaited == lastResult.oplogTime) { - // We got an empty result at least twice in a row from the source shard so - // space it out a little bit so we don't hammer the shard. - opCtx->sleepFor(Milliseconds(200)); - } - - lastOpTimeWaited = lastResult.oplogTime; } while (oplogIter.more()) { @@ -434,7 +411,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Done; - _isStateChanged.notify_all(); } } @@ -447,8 +423,6 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::ErrorOccurred; _errMsg = errMsg.toString(); - - _isStateChanged.notify_all(); } SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() { @@ -456,15 +430,4 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge return _state; } -void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) { - _errorOccurred(errMsg); -} - -void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (_state == State::Migrating) { - opCtx->waitForConditionOrInterrupt(_isStateChanged, lk); - } -} - } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h index a67bd8012f6..2e740f2f6bb 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.h +++ b/src/mongo/db/s/session_catalog_migration_destination.h @@ -37,7 +37,6 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/shard_id.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/with_lock.h" @@ -89,17 +88,6 @@ public: void join(); /** - * Forces this into an error state which will also stop session transfer thread. - */ - void forceFail(std::string& errMsg); - - /** - * Blocks until state changes is not Migrating. In other words, can return when state - * becomes ReadyToCommit/Done/ErrorOccurred, etc. - */ - void waitUntilReadyToCommit(OperationContext* opCtx); - - /** * Returns the current state. */ State getState(); @@ -121,7 +109,6 @@ private: // Protects _state and _errMsg. stdx::mutex _mutex; - stdx::condition_variable _isStateChanged; State _state = State::NotStarted; std::string _errMsg; // valid only if _state == ErrorOccurred. }; diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 1ec90ca88b1..a1a76dfbd1d 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -1303,59 +1303,6 @@ TEST_F(SessionCatalogMigrationDestinationTest, ASSERT_FALSE(sessionMigration.getErrMsg().empty()); } -TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) { - const NamespaceString kNs("a.b"); - const auto sessionId = makeLogicalSessionIdForTest(); - - auto opCtx = operationContext(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(19); - - insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30); - - SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); - sessionMigration.start(getServiceContext()); - sessionMigration.finish(); - - OplogEntry oplog1( - OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); - oplog1.setOperationSessionInfo(sessionInfo); - oplog1.setStatementId(23); - - OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); - oplog2.setOperationSessionInfo(sessionInfo); - oplog2.setStatementId(30); - - OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); - oplog3.setOperationSessionInfo(sessionInfo); - oplog3.setStatementId(45); - - returnOplog({oplog1, oplog2, oplog3}); - returnOplog({}); - - sessionMigration.join(); - - ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); - - auto session = getSessionWithTxn(opCtx, sessionId, 19); - TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19)); - - ASSERT_TRUE(historyIter.hasNext()); - checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); - - ASSERT_TRUE(historyIter.hasNext()); - checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); - - ASSERT_TRUE(historyIter.hasNext()); - auto firstInsertOplog = historyIter.next(opCtx); - - ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert); - ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject()); - ASSERT_TRUE(firstInsertOplog.getStatementId()); - ASSERT_EQ(30, *firstInsertOplog.getStatementId()); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 829d862b370..e79656ba94b 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -76,6 +76,7 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + invariant(!_lastFetchedNewWriteOplog.isEmpty()); return _lastFetchedNewWriteOplog; } } @@ -100,13 +101,11 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte return false; } - auto nextOplogBSON = nextOplog.toBSON().getOwned(); + _lastFetchedOplog = nextOplog.toBSON().getOwned(); + auto doc = fetchPrePostImageOplog(opCtx, nextOplog); if (!doc.isEmpty()) { - _lastFetchedOplogBuffer.push_back(nextOplogBSON); - _lastFetchedOplog = doc; - } else { - _lastFetchedOplog = nextOplogBSON; + _lastFetchedOplogBuffer.push_back(doc); } return true; diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 5e44531d31c..42a3e71e372 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -68,7 +68,6 @@ public: /** * Returns the oplog document that was last fetched by the fetchNextOplog call. - * Returns an empty object if there are no oplog to fetch. */ BSONObj getLastFetchedOplog(); diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index ff43cce5d0b..ad4942292ea 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd SessionCatalogMigrationSource migrationSource(kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()}; + auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()}; for (auto oplogDoc : expectedSequece) { ASSERT_TRUE(migrationSource.hasMoreOplog()); |