summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2021-10-15 15:57:16 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-15 16:32:33 +0000
commit2bf37b1ac522b39e186388ea382a5a14c0636da6 (patch)
treef00fef81a0aed90aa1d8f3776952a31d1535d1cb /src/mongo/db
parent7af0dfde3735b90d6e5467f45077152f27c77584 (diff)
downloadmongo-2bf37b1ac522b39e186388ea382a5a14c0636da6.tar.gz
SERVER-57784 Shutdown and join executors before returning from ReshardingOplogApplier tests
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp102
1 files changed, 57 insertions, 45 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index f1da6eb561f..aead93e4313 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -158,6 +158,22 @@ public:
_metrics->onStart(ReshardingMetrics::Role::kRecipient,
getServiceContext()->getFastClockSource()->now());
_metrics->setRecipientState(RecipientStateEnum::kApplying);
+
+ _executor = makeTaskExecutorForApplier();
+ _executor->startup();
+
+ _cancelableOpCtxExecutor = makeExecutorForCancelableOpCtx();
+ _cancelableOpCtxExecutor->startup();
+ }
+
+ void tearDown() {
+ _executor->shutdown();
+ _executor->join();
+
+ _cancelableOpCtxExecutor->shutdown();
+ _cancelableOpCtxExecutor->join();
+
+ ShardingMongodTestFixture::tearDown();
}
ChunkManager createChunkManagerForOriginalColl() {
@@ -269,6 +285,15 @@ public:
return bob.obj()["oplogEntriesApplied"_sd].Long();
}
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() {
+ return _executor;
+ }
+
+
+ std::shared_ptr<ThreadPool> getCancelableOpCtxExecutor() {
+ return _cancelableOpCtxExecutor;
+ }
+
protected:
auto makeApplierEnv() {
return std::make_unique<ReshardingOplogApplier::Env>(getServiceContext(), &*_metrics);
@@ -301,20 +326,17 @@ protected:
executor::makeNetworkInterface(
"TestReshardOplogApplicationNetwork", nullptr, std::move(hookList)));
- executor->startup();
return executor;
}
- CancelableOperationContextFactory makeCancelableOpCtxForApplier(CancellationToken cancelToken) {
- auto executor = std::make_shared<ThreadPool>([] {
+ std::shared_ptr<ThreadPool> makeExecutorForCancelableOpCtx() {
+ return std::make_shared<ThreadPool>([] {
ThreadPool::Options options;
options.poolName = "TestReshardOplogApplierCancelableOpCtxPool";
options.minThreads = 1;
options.maxThreads = 1;
return options;
}());
-
- return CancelableOperationContextFactory(cancelToken, executor);
}
static constexpr int kWriterPoolSize = 4;
@@ -331,6 +353,9 @@ protected:
const ReshardingSourceId _sourceId{UUID::gen(), kMyShardId};
std::unique_ptr<ReshardingMetrics> _metrics;
+
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
+ std::shared_ptr<ThreadPool> _cancelableOpCtxExecutor;
};
TEST_F(ReshardingOplogApplierTest, NothingToIterate) {
@@ -338,7 +363,6 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
@@ -349,8 +373,8 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_OK(future.getNoThrow());
}
@@ -375,7 +399,6 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -385,8 +408,8 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_OK(future.getNoThrow());
DBDirectClient client(operationContext());
@@ -420,7 +443,6 @@ TEST_F(ReshardingOplogApplierTest, CanceledApplyingBatch) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
@@ -433,9 +455,9 @@ TEST_F(ReshardingOplogApplierTest, CanceledApplyingBatch) {
auto abortSource = CancellationSource();
abortSource.cancel();
auto cancelToken = abortSource.token();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
- auto future = applier->run(executor, executor, cancelToken, factory);
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::CallbackCanceled);
}
@@ -451,7 +473,6 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 3 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -461,8 +482,8 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_OK(future.getNoThrow());
DBDirectClient client(operationContext());
@@ -492,7 +513,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringFirstBatchApply) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 4 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -502,8 +522,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringFirstBatchApply) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::FailedToParse);
DBDirectClient client(operationContext());
@@ -535,7 +555,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringSecondBatchApply) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -545,8 +564,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringSecondBatchApply) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::FailedToParse);
DBDirectClient client(operationContext());
@@ -577,7 +596,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplog) {
iterator->setThrowWhenSingleItem();
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -587,8 +605,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplog) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::InternalError);
DBDirectClient client(operationContext());
@@ -614,7 +632,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatch) {
iterator->setThrowWhenSingleItem();
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -624,8 +641,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatch) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::InternalError);
DBDirectClient client(operationContext());
@@ -655,7 +672,6 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatch) {
iterator->setThrowWhenSingleItem();
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -665,8 +681,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatch) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::InternalError);
DBDirectClient client(operationContext());
@@ -695,7 +711,6 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDown) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 4 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -704,11 +719,11 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDown) {
chunkManager(),
std::move(iterator));
- executor->shutdown();
+ getExecutor()->shutdown();
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::ShutdownInProgress);
DBDirectClient client(operationContext());
@@ -737,7 +752,6 @@ TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldError) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops), 1 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -747,8 +761,8 @@ TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldError) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::OplogOperationUnsupported);
DBDirectClient client(operationContext());
@@ -774,7 +788,6 @@ TEST_F(ReshardingOplogApplierTest, DropSourceCollectionCmdShouldError) {
auto iterator = std::make_unique<OplogIteratorMock>(std::move(ops), 1 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
- auto executor = makeTaskExecutorForApplier();
applier.emplace(makeApplierEnv(),
sourceId(),
appliedToNs(),
@@ -784,8 +797,8 @@ TEST_F(ReshardingOplogApplierTest, DropSourceCollectionCmdShouldError) {
std::move(iterator));
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier->run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier->run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_EQ(future.getNoThrow(), ErrorCodes::OplogOperationUnsupported);
auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
@@ -793,7 +806,6 @@ TEST_F(ReshardingOplogApplierTest, DropSourceCollectionCmdShouldError) {
}
TEST_F(ReshardingOplogApplierTest, MetricsAreReported) {
- auto executor = makeTaskExecutorForApplier();
// Compress the makeOplog syntax a little further for this special case.
using OpT = repl::OpTypeEnum;
auto easyOp = [this](auto ts, OpT opType, BSONObj obj1, boost::optional<BSONObj> obj2 = {}) {
@@ -818,8 +830,8 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) {
ASSERT_EQ(metricsAppliedCount(), 0);
auto cancelToken = operationContext()->getCancellationToken();
- auto factory = makeCancelableOpCtxForApplier(cancelToken);
- auto future = applier.run(executor, executor, cancelToken, factory);
+ CancelableOperationContextFactory factory(cancelToken, getCancelableOpCtxExecutor());
+ auto future = applier.run(getExecutor(), getExecutor(), cancelToken, factory);
ASSERT_OK(future.getNoThrow());
auto opCountersObj = getMetricsOpCounters();