diff options
6 files changed, 188 insertions, 24 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index f9018ac6cfc..393a99c00fa 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -292,8 +292,14 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { invariant(_state == kCloning); invariant(!opCtx->lockState()->isLocked()); + + if (_sessionCatalogSource) { + _sessionCatalogSource->onCommitCloneStarted(); + } + auto responseStatus = _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); + if (responseStatus.isOK()) { _cleanup(opCtx); @@ -313,6 +319,10 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) { invariant(!opCtx->lockState()->isLocked()); + if (_sessionCatalogSource) { + _sessionCatalogSource->onCloneCleanup(); + } + switch (_state) { case kDone: break; @@ -788,4 +798,13 @@ boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigra return boost::make_optional(opTimeToWaitIfWaitingForMajority); } +std::shared_ptr<Notification<bool>> +MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch() { + if (!_sessionCatalogSource) { + return nullptr; + } + + return _sessionCatalogSource->getNotificationForNewOplog(); +} + } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 3440f7d79ea..aa8fda0530c 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -164,6 +164,16 @@ public: boost::optional<repl::OpTime> nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder); + /** + * Returns a notification that can be used to wait for new oplog that needs to be migrated. + * If the value in the notification returns true, it means that there are no more new batches + * that needs to be fetched because the migration has already entered the critical section or + * aborted. + * + * Returns nullptr if there is no session migration associated with this migration. + */ + std::shared_ptr<Notification<bool>> getNotificationForNextSessionMigrationBatch(); + private: friend class LogOpForShardingHandler; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 6f243ebdd68..76367397e50 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -259,25 +259,37 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* opCtx, - const std::string&, - const BSONObj& cmdObj, - BSONObjBuilder& result) { - const MigrationSessionId migrationSessionId( - uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + /** + * Fetches the next batch of oplog that needs to be transferred and appends it to the given + * array builder. If it was not able to fetch anything, it will return a non-null notification + * that will get signalled when new batches comes in or when migration is over. If the boolean + * value from the notification returns true, then the migration has entered the critical + * section or aborted and there's no more new batches to fetch. + */ + std::shared_ptr<Notification<bool>> fetchNextSessionMigrationBatch( + OperationContext* opCtx, + const MigrationSessionId& migrationSessionId, + BSONArrayBuilder* arrBuilder) { + boost::optional<repl::OpTime> opTime; + std::shared_ptr<Notification<bool>> newOplogNotification; - BSONArrayBuilder arrBuilder; + writeConflictRetry( + opCtx, + "Fetching session related oplogs for migration", + NamespaceString::kRsOplogNamespace.ns(), + [&]() { + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); + opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, arrBuilder); - boost::optional<repl::OpTime> opTime; + if (arrBuilder->arrSize() == 0) { + newOplogNotification = + autoCloner.getCloner()->getNotificationForNextSessionMigrationBatch(); + } + }); - writeConflictRetry(opCtx, - "Fetching session related oplogs for migration", - NamespaceString::kRsOplogNamespace.ns(), - [&]() { - AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); - opTime = autoCloner.getCloner()->nextSessionMigrationBatch( - opCtx, &arrBuilder); - }); + if (newOplogNotification) { + return newOplogNotification; + } // If the batch returns something, we wait for write concern to ensure that all the entries // in the batch have been majority committed. We then need to check that the rollback id @@ -305,7 +317,38 @@ public: rollbackId == rollbackIdAtMigrationInit); } + return nullptr; + } + + bool run(OperationContext* opCtx, + const std::string&, + const BSONObj& cmdObj, + BSONObjBuilder& result) { + const MigrationSessionId migrationSessionId( + uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + + BSONArrayBuilder arrBuilder; + bool hasMigrationCompleted = false; + + do { + if (auto newOplogNotification = + fetchNextSessionMigrationBatch(opCtx, migrationSessionId, &arrBuilder)) { + hasMigrationCompleted = newOplogNotification->get(opCtx); + } else if (arrBuilder.arrSize() == 0) { + // If we didn't get a notification and the arrBuilder is empty, that means + // that the sessionMigration is not active for this migration (most likely + // because it's not a replica set). + hasMigrationCompleted = true; + } + } while (arrBuilder.arrSize() == 0 && !hasMigrationCompleted); + result.appendArray("oplog", arrBuilder.arr()); + + // TODO: SERVER-40187 remove after v4.2. This is to indicate the caller that this server + // waits for notification on new oplog entries to send over so the caller doesn't need + // to throttle. + result.append("waitsForNewOplog", true); + return true; } diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index ea33491350d..a057b9e7aae 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -54,6 +54,7 @@ namespace mongo { namespace { const auto kOplogField = "oplog"; +const auto kWaitsForNewOplogField = "waitsForNewOplog"; const WriteConcernOptions kMajorityWC(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Milliseconds(0)); @@ -419,6 +420,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId); oplogArray = BSONArray{nextBatch[kOplogField].Obj()}; + const auto donorWaitsForNewOplog = nextBatch[kWaitsForNewOplogField].trueValue(); if (oplogArray.isEmpty()) { { @@ -451,10 +453,12 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } } - if (lastOpTimeWaited == lastResult.oplogTime) { + // TODO: SERVER-40187 Completely remove after v4.2. donorWaitsForNewOplog is a + // 'feature flag' indicating that the donor does block until there are new oplog + // to return so we don't need to sleep here. + if (!donorWaitsForNewOplog && lastOpTimeWaited == lastResult.oplogTime) { // We got an empty result at least twice in a row from the source shard so space - // it - // out a little bit so we don't hammer the shard + // it out a little bit so we don't hammer the shard opCtx->sleepFor(Milliseconds(200)); } diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 8eddb20b931..3d37920e59c 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -165,7 +165,32 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o } bool SessionCatalogMigrationSource::hasMoreOplog() { - return _hasMoreOplogFromSessionCatalog() || _hasNewWrites(); + if (_hasMoreOplogFromSessionCatalog()) { + return true; + } + + stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + return _hasNewWrites(lk); +} + +void SessionCatalogMigrationSource::onCommitCloneStarted() { + stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + + _state = State::kCommitStarted; + if (_newOplogNotification) { + _newOplogNotification->set(true); + _newOplogNotification.reset(); + } +} + +void SessionCatalogMigrationSource::onCloneCleanup() { + stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex); + + _state = State::kCleanup; + if (_newOplogNotification) { + _newOplogNotification->set(true); + _newOplogNotification.reset(); + } } SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() { @@ -190,6 +215,31 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) { return _fetchNextNewWriteOplog(opCtx); } +std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificationForNewOplog() { + invariant(!_hasMoreOplogFromSessionCatalog()); + + stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); + + if (_newOplogNotification) { + return _newOplogNotification; + } + + auto notification = std::make_shared<Notification<bool>>(); + if (_state == State::kCleanup) { + notification->set(true); + } + // Even if commit has started, we still need to drain the current buffer. + else if (_hasNewWrites(lk)) { + notification->set(false); + } else if (_state == State::kCommitStarted) { + notification->set(true); + } else { + _newOplogNotification = notification; + } + + return notification; +} + bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) { if (_currentOplogIterator) { if (_currentOplogIterator->hasNext()) { @@ -254,8 +304,7 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC return false; } -bool SessionCatalogMigrationSource::_hasNewWrites() { - stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); +bool SessionCatalogMigrationSource::_hasNewWrites(WithLock) { return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty(); } @@ -296,6 +345,11 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) { stdx::lock_guard<stdx::mutex> lk(_newOplogMutex); _newWriteOpTimeList.push_back(opTime); + + if (_newOplogNotification) { + _newOplogNotification->set(false); + _newOplogNotification.reset(); + } } SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator( diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 04e1fc00132..b866e89e84a 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -39,6 +39,7 @@ #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/with_lock.h" namespace mongo { @@ -100,6 +101,17 @@ public: bool fetchNextOplog(OperationContext* opCtx); /** + * Returns a notification that can be used to wait for new oplog entries to fetch. Note + * that this should only be called if hasMoreOplog/fetchNextOplog returned false at + * least once. + * + * If the notification is set to true, then that means that there is no longer a need to + * fetch more oplog because the data migration has entered the critical section and + * the buffer for oplog to fetch is empty or the data migration has aborted. + */ + std::shared_ptr<Notification<bool>> getNotificationForNewOplog(); + + /** * Returns the oplog document that was last fetched by the fetchNextOplog call. * Returns an empty object if there are no oplog to fetch. */ @@ -117,6 +129,18 @@ public: return _rollbackIdAtInit; } + /** + * Inform this session migration machinery that the data migration just entered the critical + * section. + */ + void onCommitCloneStarted(); + + /** + * Inform this session migration machinery that the data migration just terminated and + * entering the cleanup phase (can be aborted or committed). + */ + void onCloneCleanup(); + private: /** * An iterator for extracting session write oplogs that need to be cloned during migration. @@ -148,6 +172,8 @@ private: std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator; }; + enum class State { kActive, kCommitStarted, kCleanup }; + /////////////////////////////////////////////////////////////////////////// // Methods for extracting the oplog entries from session information. @@ -178,7 +204,7 @@ private: /** * Returns true if there are oplog generated by new writes that needs to be fetched. */ - bool _hasNewWrites(); + bool _hasNewWrites(WithLock); /** * Attempts to fetch the next oplog entry from the new writes that was saved by saveNewWriteTS. @@ -211,7 +237,7 @@ private: // Used to store the last fetched oplog. This enables calling get multiple times. boost::optional<repl::OplogEntry> _lastFetchedOplog; - // Protects _newWriteTsList, _lastFetchedNewWriteOplog + // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification stdx::mutex _newOplogMutex; // Stores oplog opTime of new writes that are coming in. @@ -219,6 +245,14 @@ private: // Used to store the last fetched oplog from _newWriteTsList. boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog; + + // Stores the current state. + State _state{State::kActive}; + + // Holds the latest request for notification of new oplog entries that needs to be fetched. + // Sets to true if there is no need to fetch an oplog anymore (for example, because migration + // aborted). + std::shared_ptr<Notification<bool>> _newOplogNotification; }; } // namespace mongo |