diff options
author | Ian Boros <ian.boros@10gen.com> | 2018-02-01 13:32:49 -0500 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2018-02-13 18:30:23 -0500 |
commit | 30e5dded30c373f9519e4c5347f7a92a9fa2d59f (patch) | |
tree | 5f99bc229e5befa0f1e8afbbfd61103fe9a28641 /src/mongo | |
parent | f8af7207e2359a119b29b6d4e4c6945cc0bf1a8a (diff) | |
download | mongo-30e5dded30c373f9519e4c5347f7a92a9fa2d59f.tar.gz |
SERVER-28090 add ability to interrupt pinned cursors on mongos
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 19 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.h | 7 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl_test.cpp | 93 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager_test.cpp | 39 | ||||
-rw-r--r-- | src/mongo/s/query/router_exec_stage.h | 14 |
6 files changed, 145 insertions, 34 deletions
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 80e6a0b454f..daaf85a6f6f 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -73,16 +73,20 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid) - : _params(std::move(params)), _root(buildMergerPlan(opCtx, executor, &_params)), _lsid(lsid) { + : _params(std::move(params)), + _root(buildMergerPlan(opCtx, executor, &_params)), + _lsid(lsid), + _opCtx(opCtx) { dassert(!_params.compareWholeSortKey || SimpleBSONObjComparator::kInstance.evaluate( _params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern)); } -ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root, +ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, + std::unique_ptr<RouterStageMock> root, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid) - : _params(std::move(params)), _root(std::move(root)), _lsid(lsid) { + : _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) { dassert(!_params.compareWholeSortKey || SimpleBSONObjComparator::kInstance.evaluate( _params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern)); @@ -90,6 +94,13 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next( RouterExecStage::ExecContext execContext) { + + invariant(_opCtx); + const auto interruptStatus = _opCtx->checkForInterruptNoAssert(); + if (!interruptStatus.isOK()) { + return interruptStatus; + } + // First return stashed results, if there are any. if (!_stash.empty()) { auto front = std::move(_stash.front()); @@ -110,10 +121,12 @@ void ClusterClientCursorImpl::kill(OperationContext* opCtx) { } void ClusterClientCursorImpl::reattachToOperationContext(OperationContext* opCtx) { + _opCtx = opCtx; _root->reattachToOperationContext(opCtx); } void ClusterClientCursorImpl::detachFromOperationContext() { + _opCtx = nullptr; _root->detachFromOperationContext(); } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 879280e8189..88b4b25306e 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -118,7 +118,8 @@ public: /** * Constructs a CCC whose result set is generated by a mock execution stage. */ - ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root, + ClusterClientCursorImpl(OperationContext* opCtx, + std::unique_ptr<RouterStageMock> root, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid); @@ -151,6 +152,10 @@ private: // Stores the logical session id for this cursor. boost::optional<LogicalSessionId> _lsid; + + // The OperationContext that we're executing within. This can be updated if necessary by using + // detachFromOperationContext() and reattachToOperationContext(). + OperationContext* _opCtx = nullptr; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index 8fa3d649e74..2db563fdd73 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -32,6 +32,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/service_context_noop.h" #include "mongo/s/query/router_stage_mock.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" @@ -40,17 +41,37 @@ namespace mongo { namespace { -// These tests use RouterStageMock, which does not actually use its OperationContext, so rather than -// going through the trouble of making one, we'll just use nullptr throughout. -OperationContext* opCtx = nullptr; +class ClusterClientCursorImplTest : public unittest::Test { +protected: + ServiceContextNoop _serviceContext; + ServiceContext::UniqueOperationContext _opCtx; + Client* _client; + +private: + void setUp() final { + auto client = _serviceContext.makeClient("testClient"); + _opCtx = client->makeOperationContext(); + _client = client.get(); + Client::setCurrent(std::move(client)); + } + + void tearDown() final { + if (_opCtx) { + _opCtx.reset(); + } + + Client::releaseCurrent(); + } +}; -TEST(ClusterClientCursorImpl, NumReturnedSoFar) { - auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); +TEST_F(ClusterClientCursorImplTest, NumReturnedSoFar) { + auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get()); for (int i = 1; i < 10; ++i) { mockStage->queueResult(BSON("a" << i)); } - ClusterClientCursorImpl cursor(std::move(mockStage), + ClusterClientCursorImpl cursor(_opCtx.get(), + std::move(mockStage), ClusterClientCursorParams(NamespaceString("unused"), {}), boost::none); @@ -69,12 +90,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) { ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL); } -TEST(ClusterClientCursorImpl, QueueResult) { - auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); +TEST_F(ClusterClientCursorImplTest, QueueResult) { + auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get()); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 4)); - ClusterClientCursorImpl cursor(std::move(mockStage), + ClusterClientCursorImpl cursor(_opCtx.get(), + std::move(mockStage), ClusterClientCursorParams(NamespaceString("unused"), {}), boost::none); @@ -108,13 +130,14 @@ TEST(ClusterClientCursorImpl, QueueResult) { ASSERT_EQ(cursor.getNumReturnedSoFar(), 4LL); } -TEST(ClusterClientCursorImpl, RemotesExhausted) { - auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); +TEST_F(ClusterClientCursorImplTest, RemotesExhausted) { + auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get()); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->markRemotesExhausted(); - ClusterClientCursorImpl cursor(std::move(mockStage), + ClusterClientCursorImpl cursor(_opCtx.get(), + std::move(mockStage), ClusterClientCursorParams(NamespaceString("unused"), {}), boost::none); ASSERT_TRUE(cursor.remotesExhausted()); @@ -139,12 +162,13 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) { ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL); } -TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) { - auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); +TEST_F(ClusterClientCursorImplTest, ForwardsAwaitDataTimeout) { + auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get()); auto mockStagePtr = mockStage.get(); ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); - ClusterClientCursorImpl cursor(std::move(mockStage), + ClusterClientCursorImpl cursor(_opCtx.get(), + std::move(mockStage), ClusterClientCursorParams(NamespaceString("unused"), {}), boost::none); ASSERT_OK(cursor.setAwaitDataTimeout(Milliseconds(789))); @@ -154,18 +178,47 @@ TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) { ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue())); } -TEST(ClusterClientCursorImpl, LogicalSessionIdsOnCursors) { +TEST_F(ClusterClientCursorImplTest, ChecksForInterrupt) { + auto mockStage = stdx::make_unique<RouterStageMock>(nullptr); + for (int i = 1; i < 2; ++i) { + mockStage->queueResult(BSON("a" << i)); + } + + ClusterClientCursorImpl cursor(_opCtx.get(), + std::move(mockStage), + ClusterClientCursorParams(NamespaceString("unused"), {}), + boost::none); + + // Pull one result out of the cursor. + auto result = cursor.next(RouterExecStage::ExecContext::kInitialFind); + ASSERT(result.isOK()); + ASSERT_BSONOBJ_EQ(*result.getValue().getResult(), BSON("a" << 1)); + + // Now interrupt the opCtx which the cursor is running under. + { + stdx::lock_guard<Client> lk(*_client); + _opCtx->markKilled(ErrorCodes::CursorKilled); + } + + // Now check that a subsequent call to next() will fail. + result = cursor.next(RouterExecStage::ExecContext::kInitialFind); + ASSERT_NOT_OK(result.getStatus()); + ASSERT_EQ(result.getStatus(), ErrorCodes::CursorKilled); +} + +TEST_F(ClusterClientCursorImplTest, LogicalSessionIdsOnCursors) { // Make a cursor with no lsid - auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); + auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get()); ClusterClientCursorParams params(NamespaceString("test"), {}); - ClusterClientCursorImpl cursor{std::move(mockStage), std::move(params), boost::none}; + ClusterClientCursorImpl cursor{ + _opCtx.get(), std::move(mockStage), std::move(params), boost::none}; ASSERT(!cursor.getLsid()); // Make a cursor with an lsid - auto mockStage2 = stdx::make_unique<RouterStageMock>(opCtx); + auto mockStage2 = stdx::make_unique<RouterStageMock>(_opCtx.get()); ClusterClientCursorParams params2(NamespaceString("test"), {}); auto lsid = makeLogicalSessionIdForTest(); - ClusterClientCursorImpl cursor2{std::move(mockStage2), std::move(params2), lsid}; + ClusterClientCursorImpl cursor2{_opCtx.get(), std::move(mockStage2), std::move(params2), lsid}; ASSERT(*(cursor2.getLsid()) == lsid); } diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 92f6f76f29c..20a84582c7f 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -389,6 +389,13 @@ Status ClusterCursorManager::killCursor(const NamespaceString& nss, CursorId cur return cursorNotFoundStatus(nss, cursorId); } + // Interrupt any operation currently using the cursor. + OperationContext* opUsingCursor = entry->getOperationUsingCursor(); + if (opUsingCursor) { + stdx::lock_guard<Client> lk(*opUsingCursor->getClient()); + opUsingCursor->getServiceContext()->killOperation(opUsingCursor, ErrorCodes::CursorKilled); + } + entry->setKillPending(); return Status::OK(); diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index 626f585144b..0151800a799 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -104,6 +104,29 @@ protected: return *std::next(_cursorKilledFlags.begin(), i); } + void killCursorFromDifferentOpCtx(const NamespaceString& nss, CursorId cursorId) { + // Set up another client to kill the cursor. + auto killCursorClientOwned = serviceContext.makeClient("killCursorClient"); + // Keep around a raw pointer for when we transfer ownership of killingClientOwned to the + // global current client. + Client* killCursorClient = killCursorClientOwned.get(); + + // Need to swap the current client in order to make an operation context. + auto pinningClient = Client::releaseCurrent(); + Client::setCurrent(std::move(killCursorClientOwned)); + + auto killCursorOpCtx = killCursorClient->makeOperationContext(); + invariant(killCursorOpCtx); + ASSERT_OK(getManager()->killCursor(nss, cursorId)); + + // Restore the old client. We don't need 'killCursorClient' anymore. + killCursorOpCtx.reset(); + Client::releaseCurrent(); + + Client::setCurrent(std::move(pinningClient)); + } + + private: void setUp() final { auto client = serviceContext.makeClient("testClient"); @@ -368,7 +391,11 @@ TEST_F(ClusterCursorManagerTest, KillCursorBasic) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId())); + killCursorFromDifferentOpCtx(nss, pinnedCursor.getValue().getCursorId()); + + // When the cursor is pinned the operation which checked out the cursor should be interrupted. + ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); + pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(nullptr); @@ -712,7 +739,10 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); - ASSERT_OK(getManager()->killCursor(nss, cursorId)); + + killCursorFromDifferentOpCtx(nss, cursorId); + + ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); } @@ -1016,7 +1046,10 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(getManager()->killCursor(nss, cursorId)); + + killCursorFromDifferentOpCtx(nss, cursorId); + + ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled); ASSERT(!isMockCursorKilled(0)); // Pinned cursor should remain alive after reaping. diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 5690772dc2b..418419fdbef 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -141,6 +141,13 @@ public: doDetachFromOperationContext(); } + /** + * Returns a pointer to the current OperationContext, or nullptr if there is no context. + */ + OperationContext* getOpCtx() { + return _opCtx; + } + protected: /** * Performs any stage-specific reattach actions. Called after the OperationContext has been set @@ -168,13 +175,6 @@ protected: return _child.get(); } - /** - * Returns a pointer to the current OperationContext, or nullptr if there is no context. - */ - OperationContext* getOpCtx() { - return _opCtx; - } - private: OperationContext* _opCtx = nullptr; std::unique_ptr<RouterExecStage> _child; |