summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp16
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp34
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp155
3 files changed, 38 insertions, 167 deletions
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index c38663a54cf..8fb69472329 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -104,17 +104,30 @@ BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONA
return createListIndexesResponse(cursorId, specs, "firstBatch");
}
+namespace {
+struct EnsureClientHasBeenInitialized : public executor::ThreadPoolMock::Options {
+ EnsureClientHasBeenInitialized() : executor::ThreadPoolMock::Options() {
+ onCreateThread = []() { Client::initThread("CollectionClonerTestThread"); };
+ }
+};
+} // namespace
+
BaseClonerTest::BaseClonerTest()
- : _mutex(), _setStatusCondition(), _status(getDetectableErrorStatus()) {}
+ : ThreadPoolExecutorTest(EnsureClientHasBeenInitialized()),
+ _mutex(),
+ _setStatusCondition(),
+ _status(getDetectableErrorStatus()) {}
void BaseClonerTest::setUp() {
executor::ThreadPoolExecutorTest::setUp();
clear();
launchExecutorThread();
+ Client::initThread("CollectionClonerTest");
ThreadPool::Options options;
options.minThreads = 1U;
options.maxThreads = 1U;
+ options.onCreateThread = [](StringData threadName) { Client::initThread(threadName); };
dbWorkThreadPool = stdx::make_unique<ThreadPool>(options);
dbWorkThreadPool->startup();
@@ -128,6 +141,7 @@ void BaseClonerTest::tearDown() {
storageInterface.reset();
dbWorkThreadPool.reset();
+ Client::releaseCurrent();
}
void BaseClonerTest::clear() {
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 8aee68a4c58..553cc1d20bf 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -248,8 +248,17 @@ void CollectionCloner::shutdown() {
void CollectionCloner::_cancelRemainingWork_inlock() {
if (_arm) {
- Client::initThreadIfNotAlready();
- _killArmHandle = _arm->kill(cc().getOperationContext());
+ // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The
+ // TaskExecutor should never have an OperationContext attached to the Client, and the
+ // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know
+ // which situation we're in, so have to handle both.
+ auto& client = cc();
+ if (auto opCtx = client.getOperationContext()) {
+ _killArmHandle = _arm->kill(opCtx);
+ } else {
+ auto newOpCtx = client.makeOperationContext();
+ _killArmHandle = _arm->kill(newOpCtx.get());
+ }
}
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
@@ -628,9 +637,10 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
if (_collectionCloningBatchSize > 0) {
armParams.setBatchSize(_collectionCloningBatchSize);
}
- Client::initThreadIfNotAlready();
- _arm = stdx::make_unique<AsyncResultsMerger>(
- cc().getOperationContext(), _executor, std::move(armParams));
+ auto opCtx = cc().makeOperationContext();
+ _arm = stdx::make_unique<AsyncResultsMerger>(opCtx.get(), _executor, std::move(armParams));
+ _arm->detachFromOperationContext();
+ opCtx.reset();
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -643,7 +653,6 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- _arm->detachFromOperationContext();
if (!scheduleStatus.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
return;
@@ -667,9 +676,10 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS
}
Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- _arm->reattachToOperationContext(opCtx);
+ // We expect this callback to execute in a thread from a TaskExecutor which will not have an
+ // OperationContext populated. We must make one ourselves.
+ auto opCtx = cc().makeOperationContext();
+ _arm->reattachToOperationContext(opCtx.get());
while (_arm->ready()) {
auto armResultStatus = _arm->nextReady();
if (!armResultStatus.getStatus().isOK()) {
@@ -690,8 +700,10 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
Status CollectionCloner::_scheduleNextARMResultsCallback(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- Client::initThreadIfNotAlready();
- _arm->reattachToOperationContext(cc().getOperationContext());
+ // We expect this callback to execute in a thread from a TaskExecutor which will not have an
+ // OperationContext populated. We must make one ourselves.
+ auto opCtx = cc().makeOperationContext();
+ _arm->reattachToOperationContext(opCtx.get());
auto nextEvent = _arm->nextEvent();
_arm->detachFromOperationContext();
if (!nextEvent.isOK()) {
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 8322d29fdf3..9f7a31e6a93 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -1133,68 +1133,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc2 = BSON("_id" << 2);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(2, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBeforeStartup) {
collectionCloner->shutdown();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup());
@@ -1975,99 +1913,6 @@ TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDiffere
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
- collectionCloner->waitForDbWorker();
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- auto buffered = collectionCloner->getDocumentsToInsert_forTest();
- docs.insert(docs.end(), buffered.begin(), buffered.end());
- return exec->scheduleWork(workFn);
- });
-
- ASSERT_TRUE(collectionCloner->isActive());
-
- int numDocs = 6;
- std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0])));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1])));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2])));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3U, docs.size());
- for (int i = 0; i < 3; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5])));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(6U, docs.size());
- for (int i = 3; i < 6; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
-
- ASSERT_EQUALS(numDocs, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) {
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());