diff options
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service_op_observer.cpp | 43 |
5 files changed, 57 insertions, 18 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index a46fe3bdc66..664542076c5 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -329,6 +329,17 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx, MONGO_UNREACHABLE; } +SharedSemiFuture<void> CollectionShardingRuntime::getOngoingQueriesCompletionFuture( + const UUID& collectionUuid, ChunkRange const& range) { + stdx::lock_guard lk(_metadataManagerLock); + + if (!_metadataManager || _metadataManager->getCollectionUuid() != collectionUuid) { + return SemiFuture<void>::makeReady().share(); + } + return _metadataManager->getOngoingQueriesCompletionFuture(range); +} + + std::shared_ptr<ScopedCollectionDescription::Impl> CollectionShardingRuntime::_getCurrentMetadataIfKnown( const boost::optional<LogicalTime>& atClusterTime) { diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h index 94db822cf92..f8d390a9f3d 100644 --- a/src/mongo/db/s/collection_sharding_runtime.h +++ b/src/mongo/db/s/collection_sharding_runtime.h @@ -187,6 +187,12 @@ public: ChunkRange orphanRange, Date_t deadline); + /** + * Returns a future marked as ready when all the ongoing queries retaining the range complete + */ + SharedSemiFuture<void> getOngoingQueriesCompletionFuture(const UUID& collectionUuid, + ChunkRange const& range); + std::uint64_t getNumMetadataManagerChanges_forTest() { return _numMetadataManagerChanges; } diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index 7b1a0d29cb4..4c2ffcea0d9 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -341,6 +341,16 @@ boost::optional<SharedSemiFuture<void>> MetadataManager::trackOrphanedDataCleanu return boost::none; } +SharedSemiFuture<void> MetadataManager::getOngoingQueriesCompletionFuture(ChunkRange const& range) { + stdx::lock_guard<Latch> lg(_managerLock); + + auto* const overlapMetadata = _findNewestOverlappingMetadata(lg, range); + if (!overlapMetadata) { + return SemiFuture<void>::makeReady().share(); + } + return overlapMetadata->onDestructionPromise.getFuture(); +} + auto MetadataManager::_findNewestOverlappingMetadata(WithLock, ChunkRange const& range) -> CollectionMetadataTracker* { invariant(!_metadata.empty()); diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h index 15717500d91..dfef01234ba 100644 --- a/src/mongo/db/s/metadata_manager.h +++ b/src/mongo/db/s/metadata_manager.h @@ -149,6 +149,11 @@ public: boost::optional<SharedSemiFuture<void>> trackOrphanedDataCleanup( ChunkRange const& orphans) const; + /** + * Returns a future marked as ready when all the ongoing queries retaining the range complete + */ + SharedSemiFuture<void> getOngoingQueriesCompletionFuture(ChunkRange const& range); + private: // Management of the _metadata list is implemented in RangePreserver friend class RangePreserver; diff --git a/src/mongo/db/s/range_deleter_service_op_observer.cpp b/src/mongo/db/s/range_deleter_service_op_observer.cpp index 3326d6c4b89..1ca0fb3d02d 100644 --- a/src/mongo/db/s/range_deleter_service_op_observer.cpp +++ b/src/mongo/db/s/range_deleter_service_op_observer.cpp @@ -29,7 +29,9 @@ #include "mongo/db/s/range_deleter_service_op_observer.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/range_deleter_service.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/db/update/update_oplog_entry_serialization.h" @@ -38,6 +40,27 @@ namespace mongo { namespace { // Small hack used to be able to retrieve the full removed document in the `onDelete` method const auto deletedDocumentDecoration = OperationContext::declareDecoration<BSONObj>(); +void registerTaskWithOngoingQueriesOnOpLogEntryCommit(OperationContext* opCtx, + const RangeDeletionTask& rdt) { + + opCtx->recoveryUnit()->onCommit([opCtx, rdt](boost::optional<Timestamp>) { + try { + AutoGetCollection autoColl(opCtx, rdt.getNss(), MODE_IS); + auto waitForActiveQueriesToComplete = + CollectionShardingRuntime::get(opCtx, rdt.getNss()) + ->getOngoingQueriesCompletionFuture(rdt.getCollectionUuid(), rdt.getRange()) + .semi(); + (void)RangeDeleterService::get(opCtx)->registerTask( + rdt, std::move(waitForActiveQueriesToComplete)); + } catch (const DBException& ex) { + dassert(ex.code() == ErrorCodes::NotYetInitialized, + str::stream() << "No error different from `NotYetInitialized` is expected " + "to be propagated to the range deleter observer. Got error: " + << ex.toStatus()); + } + }); +} + } // namespace RangeDeleterServiceOpObserver::RangeDeleterServiceOpObserver() = default; @@ -53,15 +76,7 @@ void RangeDeleterServiceOpObserver::onInserts(OperationContext* opCtx, auto deletionTask = RangeDeletionTask::parse( IDLParserContext("RangeDeleterServiceOpObserver"), it->doc); if (!deletionTask.getPending() || !*(deletionTask.getPending())) { - try { - (void)RangeDeleterService::get(opCtx)->registerTask(deletionTask); - } catch (const DBException& ex) { - dassert(ex.code() == ErrorCodes::NotYetInitialized, - str::stream() - << "No error different from `NotYetInitialized` is expected " - "to be propagated to the range deleter observer. Got error: " - << ex.toStatus()); - } + registerTaskWithOngoingQueriesOnOpLogEntryCommit(opCtx, deletionTask); } } } @@ -85,15 +100,7 @@ void RangeDeleterServiceOpObserver::onUpdate(OperationContext* opCtx, if (pendingFieldIsRemoved || pendingFieldUpdatedToFalse) { auto deletionTask = RangeDeletionTask::parse( IDLParserContext("RangeDeleterServiceOpObserver"), args.updateArgs->updatedDoc); - try { - (void)RangeDeleterService::get(opCtx)->registerTask(deletionTask); - } catch (const DBException& ex) { - dassert(ex.code() == ErrorCodes::NotYetInitialized, - str::stream() - << "No error different from `NotYetInitialized` is expected " - "to be propagated to the range deleter observer. Got error: " - << ex.toStatus()); - } + registerTaskWithOngoingQueriesOnOpLogEntryCommit(opCtx, deletionTask); } } } |