diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 155 |
3 files changed, 38 insertions, 167 deletions
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index c38663a54cf..8fb69472329 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -104,17 +104,30 @@ BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONA return createListIndexesResponse(cursorId, specs, "firstBatch"); } +namespace { +struct EnsureClientHasBeenInitialized : public executor::ThreadPoolMock::Options { + EnsureClientHasBeenInitialized() : executor::ThreadPoolMock::Options() { + onCreateThread = []() { Client::initThread("CollectionClonerTestThread"); }; + } +}; +} // namespace + BaseClonerTest::BaseClonerTest() - : _mutex(), _setStatusCondition(), _status(getDetectableErrorStatus()) {} + : ThreadPoolExecutorTest(EnsureClientHasBeenInitialized()), + _mutex(), + _setStatusCondition(), + _status(getDetectableErrorStatus()) {} void BaseClonerTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); clear(); launchExecutorThread(); + Client::initThread("CollectionClonerTest"); ThreadPool::Options options; options.minThreads = 1U; options.maxThreads = 1U; + options.onCreateThread = [](StringData threadName) { Client::initThread(threadName); }; dbWorkThreadPool = stdx::make_unique<ThreadPool>(options); dbWorkThreadPool->startup(); @@ -128,6 +141,7 @@ void BaseClonerTest::tearDown() { storageInterface.reset(); dbWorkThreadPool.reset(); + Client::releaseCurrent(); } void BaseClonerTest::clear() { diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 8aee68a4c58..553cc1d20bf 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -248,8 +248,17 @@ void CollectionCloner::shutdown() { void CollectionCloner::_cancelRemainingWork_inlock() { if (_arm) { - Client::initThreadIfNotAlready(); - _killArmHandle = _arm->kill(cc().getOperationContext()); + // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The + // TaskExecutor should never have an OperationContext attached to the Client, and the + // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know + // which situation we're in, so have to handle both. + auto& client = cc(); + if (auto opCtx = client.getOperationContext()) { + _killArmHandle = _arm->kill(opCtx); + } else { + auto newOpCtx = client.makeOperationContext(); + _killArmHandle = _arm->kill(newOpCtx.get()); + } } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); @@ -628,9 +637,10 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa if (_collectionCloningBatchSize > 0) { armParams.setBatchSize(_collectionCloningBatchSize); } - Client::initThreadIfNotAlready(); - _arm = stdx::make_unique<AsyncResultsMerger>( - cc().getOperationContext(), _executor, std::move(armParams)); + auto opCtx = cc().makeOperationContext(); + _arm = stdx::make_unique<AsyncResultsMerger>(opCtx.get(), _executor, std::move(armParams)); + _arm->detachFromOperationContext(); + opCtx.reset(); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -643,7 +653,6 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - _arm->detachFromOperationContext(); if (!scheduleStatus.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); return; @@ -667,9 +676,10 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS } Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _arm->reattachToOperationContext(opCtx); + // We expect this callback to execute in a thread from a TaskExecutor which will not have an + // OperationContext populated. We must make one ourselves. + auto opCtx = cc().makeOperationContext(); + _arm->reattachToOperationContext(opCtx.get()); while (_arm->ready()) { auto armResultStatus = _arm->nextReady(); if (!armResultStatus.getStatus().isOK()) { @@ -690,8 +700,10 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { Status CollectionCloner::_scheduleNextARMResultsCallback( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - Client::initThreadIfNotAlready(); - _arm->reattachToOperationContext(cc().getOperationContext()); + // We expect this callback to execute in a thread from a TaskExecutor which will not have an + // OperationContext populated. We must make one ourselves. + auto opCtx = cc().makeOperationContext(); + _arm->reattachToOperationContext(opCtx.get()); auto nextEvent = _arm->nextEvent(); _arm->detachFromOperationContext(); if (!nextEvent.isOK()) { diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 8322d29fdf3..9f7a31e6a93 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -1133,68 +1133,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc2 = BSON("_id" << 2); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(2, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBeforeStartup) { collectionCloner->shutdown(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup()); @@ -1975,99 +1913,6 @@ TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDiffere ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - collectionCloner->waitForDbWorker(); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - auto buffered = collectionCloner->getDocumentsToInsert_forTest(); - docs.insert(docs.end(), buffered.begin(), buffered.end()); - return exec->scheduleWork(workFn); - }); - - ASSERT_TRUE(collectionCloner->isActive()); - - int numDocs = 6; - std::vector<BSONObj> generatedDocs = generateDocs(numDocs); - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]))); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]))); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3U, docs.size()); - for (int i = 0; i < 3; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5]))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(6U, docs.size()); - for (int i = 3; i < 6; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - - ASSERT_EQUALS(numDocs, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) { ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); |