diff options
Diffstat (limited to 'src/mongo/db/s')
16 files changed, 403 insertions, 80 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 9e6477c1d4b..575c442ac74 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -267,7 +267,9 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co return _metadataManager->getNextOrphanRange(from); } -void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) { +void CollectionShardingState::onInsertOp(OperationContext* opCtx, + const BSONObj& insertedDoc, + const Timestamp& oplogTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -291,14 +293,16 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc); + _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs); } } void CollectionShardingState::onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc) { + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -315,7 +319,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx, checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc); + _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs); } } @@ -324,7 +328,10 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState _sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)}; } -void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) { +void CollectionShardingState::onDeleteOp(OperationContext* opCtx, + const DeleteState& deleteState, + const Timestamp& oplogTs, + const Timestamp& preImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { @@ -365,7 +372,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteSt checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { - _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey); + _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs); } } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 5aa44336f70..6f9722dceb9 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -49,6 +49,7 @@ struct ChunkVersion; class CollectionMetadata; class MigrationSourceManager; class OperationContext; +class Timestamp; /** * Contains all sharding-related runtime state for a given collection. One such object is assigned @@ -223,12 +224,17 @@ public: * * The global exclusive lock is expected to be held by the caller of any of these functions. */ - void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc); + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs); void onUpdateOp(OperationContext* opCtx, const BSONObj& query, const BSONObj& update, - const BSONObj& updatedDoc); - void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState); + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs); + void onDeleteOp(OperationContext* opCtx, + const DeleteState& deleteState, + const Timestamp& oplogTs, + const Timestamp& preImageTs); void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); private: diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 844727e72c4..9ac9f4a9702 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); ASSERT_EQ(0, getInitCallCount()); @@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { { WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); ASSERT_EQ(0, getInitCallCount()); } @@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit shardIdentity.setClusterId(OID::gen()); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()); + collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); ASSERT_EQ(0, getInitCallCount()); @@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); - ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()), + ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}), AssertionException); } @@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot NamespaceString::kServerConfigurationNamespace); WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), BSON("_id" << 1)); + collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {}); ASSERT_EQ(0, getInitCallCount()); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 750bcdcda80..9d9a1407f1d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -36,6 +36,7 @@ namespace mongo { class BSONObj; class OperationContext; class Status; +class Timestamp; /** * This class is responsible for producing chunk documents to be moved from donor to a recipient @@ -118,7 +119,9 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0; + virtual void onInsertOp(OperationContext* opCtx, + const BSONObj& insertedDoc, + const Timestamp& oplogTs) = 0; /** * Notifies this cloner that an update happened to the collection, which it owns. It is up to @@ -127,7 +130,10 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0; + virtual void onUpdateOp(OperationContext* opCtx, + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) = 0; /** * Notifies this cloner that a delede happened to the collection, which it owns. It is up to the @@ -136,7 +142,10 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0; + virtual void onDeleteOp(OperationContext* opCtx, + const BSONObj& deletedDocId, + const Timestamp& oplogTs, + const Timestamp& preImageTs) = 0; protected: MigrationChunkClonerSource(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 70a76e5e473..5d0db5dea0b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -135,8 +135,14 @@ public: */ LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, const BSONObj& idObj, - const char op) - : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {} + const char op, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) + : _cloner(cloner), + _idObj(idObj.getOwned()), + _op(op), + _oplogTs(oplogTs), + _prePostImageTs(prePostImageTs) {} void commit() override { switch (_op) { @@ -156,6 +162,14 @@ public: default: MONGO_UNREACHABLE; } + + if (!_prePostImageTs.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs); + } + + if (!_oplogTs.isNull()) { + _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs); + } } void rollback() override {} @@ -164,6 +178,8 @@ private: MigrationChunkClonerSourceLegacy* const _cloner; const BSONObj _idObj; const char _op; + const Timestamp _oplogTs; + const Timestamp _prePostImageTs; }; MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, @@ -175,7 +191,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), _args.getToShardId().toString())), _donorConnStr(std::move(donorConnStr)), - _recipientHost(std::move(recipientHost)) {} + _recipientHost(std::move(recipientHost)), + _sessionCatalogSource(_args.getNss()) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -192,6 +209,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { return storeCurrentLocsStatus; } + // Prime up the session migration source if there are oplog entries to migrate. + _sessionCatalogSource.fetchNextOplog(opCtx); + // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; StartChunkCloneRequest::appendAsCommand(&cmdBuilder, @@ -314,6 +334,12 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { _cleanup(opCtx); + + if (_sessionCatalogSource.hasMoreOplog()) { + return {ErrorCodes::SessionTransferIncomplete, + "destination shard finished committing but there are still some session " + "metadata that needs to be transferred"}; + } return Status::OK(); } @@ -344,7 +370,8 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& } void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, - const BSONObj& insertedDoc) { + const BSONObj& insertedDoc, + const Timestamp& oplogTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; @@ -358,11 +385,19 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); + if (opCtx->getTxnNumber()) { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {})); + } else { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'i', {}, {})); + } } void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, - const BSONObj& updatedDoc) { + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; @@ -376,11 +411,19 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); + if (opCtx->getTxnNumber()) { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs)); + } else { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'u', {}, {})); + } } void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, - const BSONObj& deletedDocId) { + const BSONObj& deletedDocId, + const Timestamp& oplogTs, + const Timestamp& preImageTs) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; @@ -390,7 +433,13 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, return; } - opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); + if (opCtx->getTxnNumber()) { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs)); + } else { + opCtx->recoveryUnit()->registerChange( + new LogOpForShardingHandler(this, idElement.wrap(), 'd', {}, {})); + } } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -679,4 +728,27 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, arr.done(); } +void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx, + BSONArrayBuilder* arrBuilder) { + while (_sessionCatalogSource.hasMoreOplog()) { + auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog(); + + if (oplogDoc.isEmpty()) { + // Last fetched turned out empty, try to see if there are more + _sessionCatalogSource.fetchNextOplog(opCtx); + continue; + } + + // Use the builder size instead of accumulating the document sizes directly so that we + // take into consideration the overhead of BSONArray indices. + if (arrBuilder->arrSize() && + (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) { + break; + } + + arrBuilder->append(oplogDoc); + _sessionCatalogSource.fetchNextOplog(opCtx); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index c072718475a..72d299b58e4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -37,6 +37,7 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_session_id.h" +#include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/s/move_chunk_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/memory.h" @@ -72,11 +73,19 @@ public: bool isDocumentInMigratingChunk(const BSONObj& doc) override; - void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override; + void onInsertOp(OperationContext* opCtx, + const BSONObj& insertedDoc, + const Timestamp& oplogTs) override; - void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override; + void onUpdateOp(OperationContext* opCtx, + const BSONObj& updatedDoc, + const Timestamp& oplogTs, + const Timestamp& prePostImageTs) override; - void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override; + void onDeleteOp(OperationContext* opCtx, + const BSONObj& deletedDocId, + const Timestamp& oplogTs, + const Timestamp& preImageTs) override; // Legacy cloner specific functionality @@ -121,6 +130,8 @@ public: */ Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder); + void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); + private: friend class DeleteNotificationStage; friend class LogOpForShardingHandler; @@ -183,6 +194,8 @@ private: // during the cloning stage std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec; + SessionCatalogMigrationSource _sessionCatalogSource; + // Protects the entries below stdx::mutex _mutex; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 9e3e83774ee..d42c6bdaa99 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -216,5 +216,57 @@ public: } transferModsCommand; +/** + * Command for extracting the oplog entries that needs to be migrated for the given migration + * session id. + * Note: this command is not stateless. Calling this command has a side-effect of gradually + * depleting the buffer that contains the oplog entries to be transfered. + */ +class MigrateSessionCommand : public BasicCommand { +public: + MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {} + + void help(std::stringstream& h) const { + h << "internal"; + } + + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + + virtual bool slaveOk() const { + return false; + } + + virtual bool adminOnly() const { + return true; + } + + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } + + bool run(OperationContext* opCtx, + const std::string&, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + const MigrationSessionId migrationSessionId( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + + BSONArrayBuilder arrBuilder; + + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); + autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder); + + result.appendArray("oplog", arrBuilder.arr()); + return true; + } + +} migrateSessionCommand; + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 193c237db5c..a15f6487a33 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { WriteUnitOfWork wuow(operationContext()); - cloner.onInsertOp(operationContext(), createCollectionDocument(90)); - cloner.onInsertOp(operationContext(), createCollectionDocument(150)); - cloner.onInsertOp(operationContext(), createCollectionDocument(151)); - cloner.onInsertOp(operationContext(), createCollectionDocument(210)); - - cloner.onDeleteOp(operationContext(), createCollectionDocument(80)); - cloner.onDeleteOp(operationContext(), createCollectionDocument(199)); - cloner.onDeleteOp(operationContext(), createCollectionDocument(220)); + cloner.onInsertOp(operationContext(), createCollectionDocument(90), {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(150), {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(151), {}); + cloner.onInsertOp(operationContext(), createCollectionDocument(210), {}); + + cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {}); + cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {}); + cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {}); wuow.commit(); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index ca2c736a6b1..86688ba08ca 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -57,6 +57,8 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" #include "mongo/util/concurrency/notification.h" @@ -231,6 +233,8 @@ void MigrationDestinationManager::setStateFail(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } + + _sessionMigration->forceFail(msg); } void MigrationDestinationManager::setStateFailWarn(std::string msg) { @@ -240,6 +244,8 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) { _errmsg = std::move(msg); _state = FAIL; } + + _sessionMigration->forceFail(msg); } bool MigrationDestinationManager::isActive() const { @@ -334,6 +340,9 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, _migrateThreadHandle.join(); } + _sessionMigration = + stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId); + _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() { _migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern); @@ -397,6 +406,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio << _sessionId->toString()}; } + _sessionMigration->finish(); _state = COMMIT_START; auto const deadline = Date_t::now() + Seconds(30); @@ -411,6 +421,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio if (_state != DONE) { return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"}; } + return Status::OK(); } @@ -672,6 +683,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 3. Initial bulk clone setState(CLONE); + _sessionMigration->start(opCtx->getServiceContext()); + const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId); _chunkMarkedPending = true; // no lock needed, only the migrate thread looks. @@ -829,13 +842,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, break; } - sleepsecs(1); + opCtx->sleepFor(Seconds(1)); } if (t.minutes() >= 600) { setStateFail("Cannot go to critical section because secondaries cannot keep up"); return; } + + _sessionMigration->waitUntilReadyToCommit(opCtx); } { @@ -895,6 +910,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5); } + _sessionMigration->join(); + if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) { + setStateFail(_sessionMigration->getErrMsg()); + return; + } + setState(DONE); timing.done(6); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 610f70196c1..49c6f1e5089 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -39,6 +39,7 @@ #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_session_id.h" +#include "mongo/db/s/session_catalog_migration_destination.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" @@ -211,6 +212,8 @@ private: State _state{READY}; std::string _errmsg; + + std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration; }; } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index c9344031555..b1b373f17c5 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -201,7 +201,6 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, */ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const BSONObj& oplogBSON, - // const Timestamp& prePostImageTs, const ProcessOplogResult& lastResult) { ProcessOplogResult result; auto oplogEntry = parseOplog(oplogBSON); @@ -240,50 +239,62 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); result.sessionId = sessionInfo.getSessionId().value(); result.txnNum = sessionInfo.getTxnNumber().value(); + const auto stmtId = *oplogEntry.getStatementId(); auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); + if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { + return lastResult; + } + BSONObj object(result.isPrePostImage ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum); - writeConflictRetry(opCtx, - "SessionOplogMigration", - NamespaceString::kSessionTransactionsTableNamespace.ns(), - [&] { - Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); - WriteUnitOfWork wunit(opCtx); - - result.oplogTime = repl::logOp(opCtx, - "n", - oplogEntry.getNamespace(), - oplogEntry.getUuid(), - object, - &object2, - true, - sessionInfo, - *oplogEntry.getStatementId(), - oplogLink); - - auto oplogTs = result.oplogTime.getTimestamp(); - uassert(40633, - str::stream() - << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() - << ": " - << redact(oplogBSON), - !oplogTs.isNull()); - - if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary( - opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs); - } - - wunit.commit(); - }); + writeConflictRetry( + opCtx, + "SessionOplogMigration", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + // Need to take global lock here so repl::logOp will not unlock it and trigger the + // invariant that disallows unlocking global lock while inside a WUOW. + // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush + // lock will be lock/unlocked correctly. Take the transaction table db lock to + // ensure the same lock ordering with normal replicated updates to the table. + Lock::DBLock lk( + opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); + WriteUnitOfWork wunit(opCtx); + + result.oplogTime = repl::logOp(opCtx, + "n", + oplogEntry.getNamespace(), + oplogEntry.getUuid(), + object, + &object2, + true, + sessionInfo, + stmtId, + oplogLink); + + auto oplogTs = result.oplogTime.getTimestamp(); + uassert(40633, + str::stream() << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() + << ": " + << redact(oplogBSON), + !oplogTs.isNull()); + + // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because + // the next oplog will contain the real operation. + if (!result.isPrePostImage) { + scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs); + } + + wunit.commit(); + }); return result; } @@ -307,6 +318,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_state == State::NotStarted); _state = State::Migrating; + _isStateChanged.notify_all(); } _thread = stdx::thread(stdx::bind( @@ -316,6 +328,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { void SessionCatalogMigrationDestination::finish() { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Committing; + _isStateChanged.notify_all(); } void SessionCatalogMigrationDestination::join() { @@ -343,8 +356,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service auto uniqueCtx = cc().makeOperationContext(); auto opCtx = uniqueCtx.get(); - // Timestamp prePostImageTs; + bool oplogDrainedAfterCommiting = false; ProcessOplogResult lastResult; + repl::OpTime lastOpTimeWaited; while (true) { { @@ -363,7 +377,16 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_state == State::Committing) { - break; + // The migration is considered done only when it gets an empty result from + // the source shard while this is in state committing. This is to make sure + // that it doesn't miss any new oplog created between the time window where + // this depleted the buffer from the source shard and receiving the commit + // command. + if (oplogDrainedAfterCommiting) { + break; + } + + oplogDrainedAfterCommiting = true; } } @@ -381,8 +404,17 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service // Note: only transition to "ready to commit" if state is not error/force stop. if (_state == State::Migrating) { _state = State::ReadyToCommit; + _isStateChanged.notify_all(); } } + + if (lastOpTimeWaited == lastResult.oplogTime) { + // We got an empty result at least twice in a row from the source shard so + // space it out a little bit so we don't hammer the shard. + opCtx->sleepFor(Milliseconds(200)); + } + + lastOpTimeWaited = lastResult.oplogTime; } while (oplogIter.more()) { @@ -411,6 +443,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::Done; + _isStateChanged.notify_all(); } } @@ -423,6 +456,8 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) { stdx::lock_guard<stdx::mutex> lk(_mutex); _state = State::ErrorOccurred; _errMsg = errMsg.toString(); + + _isStateChanged.notify_all(); } SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() { @@ -430,4 +465,15 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge return _state; } +void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) { + _errorOccurred(errMsg); +} + +void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (_state == State::Migrating) { + opCtx->waitForConditionOrInterrupt(_isStateChanged, lk); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h index 2e740f2f6bb..a67bd8012f6 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.h +++ b/src/mongo/db/s/session_catalog_migration_destination.h @@ -37,6 +37,7 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/shard_id.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/with_lock.h" @@ -88,6 +89,17 @@ public: void join(); /** + * Forces this into an error state which will also stop session transfer thread. + */ + void forceFail(std::string& errMsg); + + /** + * Blocks until state changes is not Migrating. In other words, can return when state + * becomes ReadyToCommit/Done/ErrorOccurred, etc. + */ + void waitUntilReadyToCommit(OperationContext* opCtx); + + /** * Returns the current state. */ State getState(); @@ -109,6 +121,7 @@ private: // Protects _state and _errMsg. stdx::mutex _mutex; + stdx::condition_variable _isStateChanged; State _state = State::NotStarted; std::string _errMsg; // valid only if _state == ErrorOccurred. }; diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index a1a76dfbd1d..71c63721c6f 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -253,6 +253,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTr sessionMigration.start(getServiceContext()); sessionMigration.finish(); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -285,6 +287,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { oplog3.setStatementId(5); returnOplog({oplog1, oplog2, oplog3}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -336,6 +340,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn oplog3.setStatementId(5); returnOplog({oplog1, oplog2, oplog3}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -380,6 +386,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate returnOplog({oplog1, oplog2}); returnOplog({oplog3}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -433,6 +441,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) oplog3.setStatementId(5); returnOplog({oplog1, oplog2, oplog3}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -506,6 +516,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) oplog2.setStatementId(45); returnOplog({oplog1, oplog2}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -556,6 +568,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA updateOplog.setPreImageTs(Timestamp(100, 2)); returnOplog({preImageOplog, updateOplog}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -645,6 +659,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind updateOplog.setPostImageTs(Timestamp(100, 2)); returnOplog({postImageOplog, updateOplog}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -735,6 +751,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify returnOplog({preImageOplog}); returnOplog({updateOplog}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -829,6 +847,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { oplog2.setStatementId(45); returnOplog({oplog1, oplog2}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -883,6 +903,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt oplog2.setStatementId(45); returnOplog({oplog2}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -1048,6 +1070,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, oplog2.setStatementId(45); returnOplog({oplog2}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); returnOplog({}); sessionMigration.join(); @@ -1303,6 +1327,61 @@ TEST_F(SessionCatalogMigrationDestinationTest, ASSERT_FALSE(sessionMigration.getErrMsg().empty()); } +TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + auto opCtx = operationContext(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(19); + + insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OplogEntry oplog1( + OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(30); + + OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); + oplog3.setOperationSessionInfo(sessionInfo); + oplog3.setStatementId(45); + + returnOplog({oplog1, oplog2, oplog3}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto session = getSessionWithTxn(opCtx, sessionId, 19); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + auto firstInsertOplog = historyIter.next(opCtx); + + ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert); + ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject()); + ASSERT_TRUE(firstInsertOplog.getStatementId()); + ASSERT_EQ(30, *firstInsertOplog.getStatementId()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index e79656ba94b..829d862b370 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -76,7 +76,6 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); - invariant(!_lastFetchedNewWriteOplog.isEmpty()); return _lastFetchedNewWriteOplog; } } @@ -101,11 +100,13 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte return false; } - _lastFetchedOplog = nextOplog.toBSON().getOwned(); - + auto nextOplogBSON = nextOplog.toBSON().getOwned(); auto doc = fetchPrePostImageOplog(opCtx, nextOplog); if (!doc.isEmpty()) { - _lastFetchedOplogBuffer.push_back(doc); + _lastFetchedOplogBuffer.push_back(nextOplogBSON); + _lastFetchedOplog = doc; + } else { + _lastFetchedOplog = nextOplogBSON; } return true; diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 42a3e71e372..5e44531d31c 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -68,6 +68,7 @@ public: /** * Returns the oplog document that was last fetched by the fetchNextOplog call. + * Returns an empty object if there are no oplog to fetch. */ BSONObj getLastFetchedOplog(); diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index ad4942292ea..ff43cce5d0b 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd SessionCatalogMigrationSource migrationSource(kNs); ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()}; + auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()}; for (auto oplogDoc : expectedSequece) { ASSERT_TRUE(migrationSource.hasMoreOplog()); |