summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2020-12-24 12:13:16 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-24 12:40:28 +0000
commit21418579095fa5ff44a851c2feb62ea4773ac3a6 (patch)
tree14dd7ba3bd5954e18c6371991fafdca04f1fcd3d /src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
parentac9e2bcb4346a4aa5347e070d974f9bd7ab7c57d (diff)
downloadmongo-21418579095fa5ff44a851c2feb62ea4773ac3a6.tar.gz
SERVER-53108 Move batching logic into ReshardingDonorOplogIterator.
Raises the default value of the reshardingBatchLimitOperations server parameter to 5,000 oplog entries to match that of the replBatchLimitOperations server parameter. Also introduces a reshardingBatchLimitBytes server parameter.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp168
1 files changed, 132 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index e29851f0d29..021e42f6921 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -33,9 +33,11 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
+#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/sharding_mongod_test_fixture.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/unittest/unittest.h"
#include "mongo/logv2/log.h"
@@ -43,6 +45,8 @@
namespace mongo {
namespace {
+const ReshardingDonorOplogId kResumeFromBeginning{Timestamp::min(), Timestamp::min()};
+
repl::MutableOplogEntry makeOplog(const NamespaceString& nss,
const UUID& uuid,
const repl::OpTypeEnum& opType,
@@ -72,6 +76,21 @@ public:
}
} onInsertAlwaysReady;
+class ScopedServerParameterChange {
+public:
+ ScopedServerParameterChange(int* param, int newValue) : _param(param), _originalValue(*_param) {
+ *param = newValue;
+ }
+
+ ~ScopedServerParameterChange() {
+ *_param = _originalValue;
+ }
+
+private:
+ int* const _param;
+ const int _originalValue;
+};
+
class ReshardingDonorOplogIterTest : public ShardingMongodTestFixture {
public:
repl::MutableOplogEntry makeInsertOplog(const Timestamp& id, BSONObj doc) {
@@ -100,10 +119,52 @@ public:
return oplog.get_id()->getDocument().toBson();
}
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutorForIterator() {
+ // The ReshardingDonorOplogIterator expects there to already be a Client associated with the
+ // thread from the thread pool. We set up the ThreadPoolTaskExecutor similarly to how the
+ // recipient's primary-only service is set up.
+ executor::ThreadPoolMock::Options threadPoolOptions;
+ threadPoolOptions.onCreateThread = [] {
+ Client::initThread("TestReshardingDonorOplogIterator");
+ auto& client = cc();
+ {
+ stdx::lock_guard<Client> lk(client);
+ client.setSystemOperationKillableByStepdown(lk);
+ }
+ };
+
+ auto executor = executor::makeThreadPoolTestExecutor(
+ std::make_unique<executor::NetworkInterfaceMock>(), std::move(threadPoolOptions));
+
+ executor->startup();
+ return executor;
+ }
+
+ auto getNextBatch(ReshardingDonorOplogIterator* iter,
+ std::shared_ptr<executor::TaskExecutor> executor) {
+ // There isn't a guarantee that the reference count to `executor` has been decremented after
+ // .get() returns. We schedule a trivial task on the task executor to ensure the callback's
+ // destructor has run. Otherwise `executor` could end up outliving the ServiceContext and
+ // triggering an invariant due to the task executor's thread having a Client still.
+ return ExecutorFuture(executor)
+ .then([iter, executor] { return iter->getNextBatch(std::move(executor)); })
+ .then([](auto x) { return x; })
+ .get();
+ }
+
+ ServiceContext::UniqueClient makeKillableClient() {
+ auto client = getServiceContext()->makeClient("ReshardingDonorOplogIterator");
+ stdx::lock_guard<Client> lk(*client);
+ client->setSystemOperationKillableByStepdown(lk);
+ return client;
+ }
+
private:
const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"};
const NamespaceString _crudNss{"test.foo"};
const UUID _uuid{UUID::gen()};
+
+ ScopedServerParameterChange _iteratorBatchSize{&resharding::gReshardingBatchLimitOperations, 1};
};
TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) {
@@ -119,23 +180,24 @@ TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) {
client.insert(ns, finalOplog.toBSON());
client.insert(ns, oplogBeyond.toBSON());
- ReshardingDonorOplogIterator iter(oplogNss(), boost::none, &onInsertAlwaysReady);
- ASSERT_TRUE(iter.hasMore());
- auto next = iter.getNext(operationContext()).get();
+ ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady);
+ auto executor = makeTaskExecutorForIterator();
+ auto altClient = makeKillableClient();
+ AlternativeClientRegion acr(altClient);
- ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next));
+ auto next = getNextBatch(&iter, executor);
+ ASSERT_EQ(next.size(), 1U);
+ ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0]));
- ASSERT_TRUE(iter.hasMore());
- next = iter.getNext(operationContext()).get();
- ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next));
+ next = getNextBatch(&iter, executor);
+ ASSERT_EQ(next.size(), 1U);
+ ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0]));
- ASSERT_TRUE(iter.hasMore());
- next = iter.getNext(operationContext()).get();
- ASSERT_FALSE(next);
+ next = getNextBatch(&iter, executor);
+ ASSERT_TRUE(next.empty());
- ASSERT_FALSE(iter.hasMore());
- next = iter.getNext(operationContext()).get();
- ASSERT_FALSE(next);
+ next = getNextBatch(&iter, executor);
+ ASSERT_TRUE(next.empty());
}
TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) {
@@ -151,15 +213,16 @@ TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) {
ReshardingDonorOplogId resumeToken(Timestamp(2, 4), Timestamp(2, 4));
ReshardingDonorOplogIterator iter(oplogNss(), resumeToken, &onInsertAlwaysReady);
- ASSERT_TRUE(iter.hasMore());
- auto next = iter.getNext(operationContext()).get();
- ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next));
+ auto executor = makeTaskExecutorForIterator();
+ auto altClient = makeKillableClient();
+ AlternativeClientRegion acr(altClient);
- ASSERT_TRUE(iter.hasMore());
- next = iter.getNext(operationContext()).get();
- ASSERT_FALSE(next);
+ auto next = getNextBatch(&iter, executor);
+ ASSERT_EQ(next.size(), 1U);
+ ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0]));
- ASSERT_FALSE(iter.hasMore());
+ next = getNextBatch(&iter, executor);
+ ASSERT_TRUE(next.empty());
}
TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) {
@@ -172,27 +235,60 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) {
const auto ns = oplogNss().ns();
client.insert(ns, oplog1.toBSON());
- ReshardingDonorOplogIterator iter(oplogNss(), boost::none, &onInsertAlwaysReady);
- ASSERT_TRUE(iter.hasMore());
- auto next = iter.getNext(operationContext()).get();
- ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next));
+ class InsertNotifier : public resharding::OnInsertAwaitable {
+ public:
+ using Callback = std::function<void(OperationContext*, size_t)>;
- ASSERT_TRUE(iter.hasMore());
+ InsertNotifier(ServiceContext* serviceContext, Callback onAwaitInsertCalled)
+ : _serviceContext(serviceContext), _onAwaitInsertCalled(onAwaitInsertCalled) {}
- client.insert(ns, oplog2.toBSON());
- client.insert(ns, finalOplog.toBSON());
- client.insert(ns, oplogBeyond.toBSON());
+ Future<void> awaitInsert(const ReshardingDonorOplogId& lastSeen) override {
+ ++numCalls;
+
+ auto client = _serviceContext->makeClient("onAwaitInsertCalled");
+ AlternativeClientRegion acr(client);
+ auto opCtx = cc().makeOperationContext();
+ _onAwaitInsertCalled(opCtx.get(), numCalls);
+
+ return Future<void>::makeReady();
+ }
+
+ size_t numCalls = 0;
+
+ private:
+ ServiceContext* _serviceContext;
+ Callback _onAwaitInsertCalled;
+ } insertNotifier{getServiceContext(), [&](OperationContext* opCtx, size_t numCalls) {
+ DBDirectClient client(opCtx);
+
+ if (numCalls == 1) {
+ client.insert(ns, oplog2.toBSON());
+ } else {
+ client.insert(ns, finalOplog.toBSON());
+ client.insert(ns, oplogBeyond.toBSON());
+ }
+ }};
+
+ ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &insertNotifier);
+ auto executor = makeTaskExecutorForIterator();
+ auto altClient = makeKillableClient();
+ AlternativeClientRegion acr(altClient);
+
+ auto next = getNextBatch(&iter, executor);
+ ASSERT_EQ(next.size(), 1U);
+ ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0]));
+
+ next = getNextBatch(&iter, executor);
+ ASSERT_EQ(next.size(), 1U);
+ ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0]));
- next = iter.getNext(operationContext()).get();
- ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next));
+ next = getNextBatch(&iter, executor);
+ ASSERT_TRUE(next.empty());
- ASSERT_TRUE(iter.hasMore());
- next = iter.getNext(operationContext()).get();
- ASSERT_FALSE(next);
+ next = getNextBatch(&iter, executor);
+ ASSERT_TRUE(next.empty());
- ASSERT_FALSE(iter.hasMore());
- next = iter.getNext(operationContext()).get();
- ASSERT_FALSE(next);
+ ASSERT_EQ(insertNotifier.numCalls, 2U);
}
} // anonymous namespace