diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2021-10-15 15:57:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-15 16:32:33 +0000 |
commit | 2bf37b1ac522b39e186388ea382a5a14c0636da6 (patch) | |
tree | f00fef81a0aed90aa1d8f3776952a31d1535d1cb | |
parent | 7af0dfde3735b90d6e5467f45077152f27c77584 (diff) | |
download | mongo-2bf37b1ac522b39e186388ea382a5a14c0636da6.tar.gz |
SERVER-57784 Shutdown and join executors before returning from ReshardingOplogApplier tests
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp | 102 |
1 files changed, 57 insertions, 45 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index f1da6eb561f..aead93e4313 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -158,6 +158,22 @@ public: _metrics->onStart(ReshardingMetrics::Role::kRecipient, getServiceContext()->getFastClockSource()->now()); _metrics->setRecipientState(RecipientStateEnum::kApplying); + + _executor = makeTaskExecutorForApplier(); + _executor->startup(); + + _cancelableOpCtxExecutor = makeExecutorForCancelableOpCtx(); + _cancelableOpCtxExecutor->startup(); + } + + void tearDown() { + _executor->shutdown(); + _executor->join(); + + _cancelableOpCtxExecutor->shutdown(); + _cancelableOpCtxExecutor->join(); + + ShardingMongodTestFixture::tearDown(); } ChunkManager createChunkManagerForOriginalColl() { @@ -269,6 +285,15 @@ public: return bob.obj()["oplogEntriesApplied"_sd].Long(); } + std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() { + return _executor; + } + + + std::shared_ptr<ThreadPool> getCancelableOpCtxExecutor() { + return _cancelableOpCtxExecutor; + } + protected: auto makeApplierEnv() { return std::make_unique<ReshardingOplogApplier::Env>(getServiceContext(), &*_metrics); @@ -301,20 +326,17 @@ protected: executor::makeNetworkInterface( "TestReshardOplogApplicationNetwork", nullptr, std::move(hookList))); - executor->startup(); return executor; } - CancelableOperationContextFactory makeCancelableOpCtxForApplier(CancellationToken cancelToken) { - auto executor = std::make_shared<ThreadPool>([] { + std::shared_ptr<ThreadPool> makeExecutorForCancelableOpCtx() { + return std::make_shared<ThreadPool>([] { ThreadPool::Options options; options.poolName = "TestReshardOplogApplierCancelableOpCtxPool"; options.minThreads = 1; options.maxThreads = 1; return options; }()); - - return CancelableOperationContextFactory(cancelToken, executor); } static constexpr int kWriterPoolSize = 4; @@ -331,6 +353,9 @@ protected: const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId}; std::unique_ptr<ReshardingMetrics> _metrics; + + std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; + std::shared_ptr<ThreadPool> _cancelableOpCtxExecutor; }; TEST_F(ReshardingOplogApplierTest, NothingToIterate) { @@ -338,7 +363,6 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), @@ -349,8 +373,8 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_OK(future.getNoThrow()); } @@ -375,7 +399,6 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -385,8 +408,8 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_OK(future.getNoThrow()); DBDirectClient client(operationContext()); @@ -420,7 +443,6 @@ TEST_F(ReshardingOplogApplierTest, CanceledApplyingBatch) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), @@ -433,9 +455,9 @@ TEST_F(ReshardingOplogApplierTest, CanceledApplyingBatch) { auto abortSource = CancellationSource(); abortSource.cancel(); auto cancelToken = abortSource.token(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); - auto future = applier->run(executor, executor, cancelToken, factory); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::CallbackCanceled); } @@ -451,7 +473,6 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 3 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -461,8 +482,8 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_OK(future.getNoThrow()); DBDirectClient client(operationContext()); @@ -492,7 +513,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringFirstBatchApply) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 4 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -502,8 +522,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringFirstBatchApply) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::FailedToParse); DBDirectClient client(operationContext()); @@ -535,7 +555,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringSecondBatchApply) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -545,8 +564,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringSecondBatchApply) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::FailedToParse); DBDirectClient client(operationContext()); @@ -577,7 +596,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplog) { iterator->setThrowWhenSingleItem(); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -587,8 +605,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplog) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::InternalError); DBDirectClient client(operationContext()); @@ -614,7 +632,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatch) { iterator->setThrowWhenSingleItem(); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -624,8 +641,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatch) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::InternalError); DBDirectClient client(operationContext()); @@ -655,7 +672,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatch) { iterator->setThrowWhenSingleItem(); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -665,8 +681,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatch) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::InternalError); DBDirectClient client(operationContext()); @@ -695,7 +711,6 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDown) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 4 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -704,11 +719,11 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDown) { chunkManager(), std::move(iterator)); - executor->shutdown(); + getExecutor()->shutdown(); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::ShutdownInProgress); DBDirectClient client(operationContext()); @@ -737,7 +752,6 @@ TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldError) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops), 1 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -747,8 +761,8 @@ TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldError) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::OplogOperationUnsupported); DBDirectClient client(operationContext()); @@ -774,7 +788,6 @@ TEST_F(ReshardingOplogApplierTest, DropSourceCollectionCmdShouldError) { auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops), 1 /* batchSize */); boost::optional<ReshardingOplogApplier> applier; - auto executor = makeTaskExecutorForApplier(); applier.emplace(makeApplierEnv(), sourceId(), appliedToNs(), @@ -784,8 +797,8 @@ TEST_F(ReshardingOplogApplierTest, DropSourceCollectionCmdShouldError) { std::move(iterator)); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier->run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_EQ(future.getNoThrow(), ErrorCodes::OplogOperationUnsupported); auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); @@ -793,7 +806,6 @@ TEST_F(ReshardingOplogApplierTest, DropSourceCollectionCmdShouldError) { } TEST_F(ReshardingOplogApplierTest, MetricsAreReported) { - auto executor = makeTaskExecutorForApplier(); // Compress the makeOplog syntax a little further for this special case. using OpT = repl::OpTypeEnum; auto easyOp = [this](auto ts, OpT opType, BSONObj obj1, boost::optional<BSONObj> obj2 = {}) { @@ -818,8 +830,8 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) { ASSERT_EQ(metricsAppliedCount(), 0); auto cancelToken = operationContext()->getCancellationToken(); - auto factory = makeCancelableOpCtxForApplier(cancelToken); - auto future = applier.run(executor, executor, cancelToken, factory); + CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor()); + auto future = applier.run(getExecutor(), getExecutor(), cancelToken, factory); ASSERT_OK(future.getNoThrow()); auto opCountersObj = getMetricsOpCounters(); |