diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2020-12-24 12:13:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-24 12:40:28 +0000 |
commit | 21418579095fa5ff44a851c2feb62ea4773ac3a6 (patch) | |
tree | 14dd7ba3bd5954e18c6371991fafdca04f1fcd3d /src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp | |
parent | ac9e2bcb4346a4aa5347e070d974f9bd7ab7c57d (diff) | |
download | mongo-21418579095fa5ff44a851c2feb62ea4773ac3a6.tar.gz |
SERVER-53108 Move batching logic into ReshardingDonorOplogIterator.
Raises the default value of the reshardingBatchLimitOperations server
parameter to 5,000 oplog entries to match that of the
replBatchLimitOperations server parameter.
Also introduces a reshardingBatchLimitBytes server parameter.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp | 168 |
1 files changed, 132 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index e29851f0d29..021e42f6921 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -33,9 +33,11 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" +#include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/sharding_mongod_test_fixture.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/unittest/unittest.h" #include "mongo/logv2/log.h" @@ -43,6 +45,8 @@ namespace mongo { namespace { +const ReshardingDonorOplogId kResumeFromBeginning{Timestamp::min(), Timestamp::min()}; + repl::MutableOplogEntry makeOplog(const NamespaceString& nss, const UUID& uuid, const repl::OpTypeEnum& opType, @@ -72,6 +76,21 @@ public: } } onInsertAlwaysReady; +class ScopedServerParameterChange { +public: + ScopedServerParameterChange(int* param, int newValue) : _param(param), _originalValue(*_param) { + *param = newValue; + } + + ~ScopedServerParameterChange() { + *_param = _originalValue; + } + +private: + int* const _param; + const int _originalValue; +}; + class ReshardingDonorOplogIterTest : public ShardingMongodTestFixture { public: repl::MutableOplogEntry makeInsertOplog(const Timestamp& id, BSONObj doc) { @@ -100,10 +119,52 @@ public: return oplog.get_id()->getDocument().toBson(); } + std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutorForIterator() { + // The ReshardingDonorOplogIterator expects there to already be a Client associated with the + // thread from the thread pool. We set up the ThreadPoolTaskExecutor similarly to how the + // recipient's primary-only service is set up. + executor::ThreadPoolMock::Options threadPoolOptions; + threadPoolOptions.onCreateThread = [] { + Client::initThread("TestReshardingDonorOplogIterator"); + auto& client = cc(); + { + stdx::lock_guard<Client> lk(client); + client.setSystemOperationKillableByStepdown(lk); + } + }; + + auto executor = executor::makeThreadPoolTestExecutor( + std::make_unique<executor::NetworkInterfaceMock>(), std::move(threadPoolOptions)); + + executor->startup(); + return executor; + } + + auto getNextBatch(ReshardingDonorOplogIterator* iter, + std::shared_ptr<executor::TaskExecutor> executor) { + // There isn't a guarantee that the reference count to `executor` has been decremented after + // .get() returns. We schedule a trivial task on the task executor to ensure the callback's + // destructor has run. Otherwise `executor` could end up outliving the ServiceContext and + // triggering an invariant due to the task executor's thread having a Client still. + return ExecutorFuture(executor) + .then([iter, executor] { return iter->getNextBatch(std::move(executor)); }) + .then([](auto x) { return x; }) + .get(); + } + + ServiceContext::UniqueClient makeKillableClient() { + auto client = getServiceContext()->makeClient("ReshardingDonorOplogIterator"); + stdx::lock_guard<Client> lk(*client); + client->setSystemOperationKillableByStepdown(lk); + return client; + } + private: const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"}; const NamespaceString _crudNss{"test.foo"}; const UUID _uuid{UUID::gen()}; + + ScopedServerParameterChange _iteratorBatchSize{&resharding::gReshardingBatchLimitOperations, 1}; }; TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) { @@ -119,23 +180,24 @@ TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) { client.insert(ns, finalOplog.toBSON()); client.insert(ns, oplogBeyond.toBSON()); - ReshardingDonorOplogIterator iter(oplogNss(), boost::none, &onInsertAlwaysReady); - ASSERT_TRUE(iter.hasMore()); - auto next = iter.getNext(operationContext()).get(); + ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady); + auto executor = makeTaskExecutorForIterator(); + auto altClient = makeKillableClient(); + AlternativeClientRegion acr(altClient); - ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next)); + auto next = getNextBatch(&iter, executor); + ASSERT_EQ(next.size(), 1U); + ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0])); - ASSERT_TRUE(iter.hasMore()); - next = iter.getNext(operationContext()).get(); - ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next)); + next = getNextBatch(&iter, executor); + ASSERT_EQ(next.size(), 1U); + ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0])); - ASSERT_TRUE(iter.hasMore()); - next = iter.getNext(operationContext()).get(); - ASSERT_FALSE(next); + next = getNextBatch(&iter, executor); + ASSERT_TRUE(next.empty()); - ASSERT_FALSE(iter.hasMore()); - next = iter.getNext(operationContext()).get(); - ASSERT_FALSE(next); + next = getNextBatch(&iter, executor); + ASSERT_TRUE(next.empty()); } TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) { @@ -151,15 +213,16 @@ TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) { ReshardingDonorOplogId resumeToken(Timestamp(2, 4), Timestamp(2, 4)); ReshardingDonorOplogIterator iter(oplogNss(), resumeToken, &onInsertAlwaysReady); - ASSERT_TRUE(iter.hasMore()); - auto next = iter.getNext(operationContext()).get(); - ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next)); + auto executor = makeTaskExecutorForIterator(); + auto altClient = makeKillableClient(); + AlternativeClientRegion acr(altClient); - ASSERT_TRUE(iter.hasMore()); - next = iter.getNext(operationContext()).get(); - ASSERT_FALSE(next); + auto next = getNextBatch(&iter, executor); + ASSERT_EQ(next.size(), 1U); + ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0])); - ASSERT_FALSE(iter.hasMore()); + next = getNextBatch(&iter, executor); + ASSERT_TRUE(next.empty()); } TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { @@ -172,27 +235,60 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { const auto ns = oplogNss().ns(); client.insert(ns, oplog1.toBSON()); - ReshardingDonorOplogIterator iter(oplogNss(), boost::none, &onInsertAlwaysReady); - ASSERT_TRUE(iter.hasMore()); - auto next = iter.getNext(operationContext()).get(); - ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next)); + class InsertNotifier : public resharding::OnInsertAwaitable { + public: + using Callback = std::function<void(OperationContext*, size_t)>; - ASSERT_TRUE(iter.hasMore()); + InsertNotifier(ServiceContext* serviceContext, Callback onAwaitInsertCalled) + : _serviceContext(serviceContext), _onAwaitInsertCalled(onAwaitInsertCalled) {} - client.insert(ns, oplog2.toBSON()); - client.insert(ns, finalOplog.toBSON()); - client.insert(ns, oplogBeyond.toBSON()); + Future<void> awaitInsert(const ReshardingDonorOplogId& lastSeen) override { + ++numCalls; + + auto client = _serviceContext->makeClient("onAwaitInsertCalled"); + AlternativeClientRegion acr(client); + auto opCtx = cc().makeOperationContext(); + _onAwaitInsertCalled(opCtx.get(), numCalls); + + return Future<void>::makeReady(); + } + + size_t numCalls = 0; + + private: + ServiceContext* _serviceContext; + Callback _onAwaitInsertCalled; + } insertNotifier{getServiceContext(), [&](OperationContext* opCtx, size_t numCalls) { + DBDirectClient client(opCtx); + + if (numCalls == 1) { + client.insert(ns, oplog2.toBSON()); + } else { + client.insert(ns, finalOplog.toBSON()); + client.insert(ns, oplogBeyond.toBSON()); + } + }}; + + ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &insertNotifier); + auto executor = makeTaskExecutorForIterator(); + auto altClient = makeKillableClient(); + AlternativeClientRegion acr(altClient); + + auto next = getNextBatch(&iter, executor); + ASSERT_EQ(next.size(), 1U); + ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0])); + + next = getNextBatch(&iter, executor); + ASSERT_EQ(next.size(), 1U); + ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0])); - next = iter.getNext(operationContext()).get(); - ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next)); + next = getNextBatch(&iter, executor); + ASSERT_TRUE(next.empty()); - ASSERT_TRUE(iter.hasMore()); - next = iter.getNext(operationContext()).get(); - ASSERT_FALSE(next); + next = getNextBatch(&iter, executor); + ASSERT_TRUE(next.empty()); - ASSERT_FALSE(iter.hasMore()); - next = iter.getNext(operationContext()).get(); - ASSERT_FALSE(next); + ASSERT_EQ(insertNotifier.numCalls, 2U); } } // anonymous namespace |