summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-02-01 13:32:49 -0500
committerIan Boros <ian.boros@10gen.com>2018-02-13 18:30:23 -0500
commit30e5dded30c373f9519e4c5347f7a92a9fa2d59f (patch)
tree5f99bc229e5befa0f1e8afbbfd61103fe9a28641 /src/mongo
parentf8af7207e2359a119b29b6d4e4c6945cc0bf1a8a (diff)
downloadmongo-30e5dded30c373f9519e4c5347f7a92a9fa2d59f.tar.gz
SERVER-28090 add ability to interrupt pinned cursors on mongos
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp19
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h7
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp93
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp7
-rw-r--r--src/mongo/s/query/cluster_cursor_manager_test.cpp39
-rw-r--r--src/mongo/s/query/router_exec_stage.h14
6 files changed, 145 insertions, 34 deletions
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 80e6a0b454f..daaf85a6f6f 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -73,16 +73,20 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
- : _params(std::move(params)), _root(buildMergerPlan(opCtx, executor, &_params)), _lsid(lsid) {
+ : _params(std::move(params)),
+ _root(buildMergerPlan(opCtx, executor, &_params)),
+ _lsid(lsid),
+ _opCtx(opCtx) {
dassert(!_params.compareWholeSortKey ||
SimpleBSONObjComparator::kInstance.evaluate(
_params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern));
}
-ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root,
+ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
+ std::unique_ptr<RouterStageMock> root,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
- : _params(std::move(params)), _root(std::move(root)), _lsid(lsid) {
+ : _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) {
dassert(!_params.compareWholeSortKey ||
SimpleBSONObjComparator::kInstance.evaluate(
_params.sort == ClusterClientCursorParams::kWholeSortKeySortPattern));
@@ -90,6 +94,13 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock
StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next(
RouterExecStage::ExecContext execContext) {
+
+ invariant(_opCtx);
+ const auto interruptStatus = _opCtx->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ return interruptStatus;
+ }
+
// First return stashed results, if there are any.
if (!_stash.empty()) {
auto front = std::move(_stash.front());
@@ -110,10 +121,12 @@ void ClusterClientCursorImpl::kill(OperationContext* opCtx) {
}
void ClusterClientCursorImpl::reattachToOperationContext(OperationContext* opCtx) {
+ _opCtx = opCtx;
_root->reattachToOperationContext(opCtx);
}
void ClusterClientCursorImpl::detachFromOperationContext() {
+ _opCtx = nullptr;
_root->detachFromOperationContext();
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 879280e8189..88b4b25306e 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -118,7 +118,8 @@ public:
/**
* Constructs a CCC whose result set is generated by a mock execution stage.
*/
- ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root,
+ ClusterClientCursorImpl(OperationContext* opCtx,
+ std::unique_ptr<RouterStageMock> root,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid);
@@ -151,6 +152,10 @@ private:
// Stores the logical session id for this cursor.
boost::optional<LogicalSessionId> _lsid;
+
+ // The OperationContext that we're executing within. This can be updated if necessary by using
+ // detachFromOperationContext() and reattachToOperationContext().
+ OperationContext* _opCtx = nullptr;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 8fa3d649e74..2db563fdd73 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/service_context_noop.h"
#include "mongo/s/query/router_stage_mock.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
@@ -40,17 +41,37 @@ namespace mongo {
namespace {
-// These tests use RouterStageMock, which does not actually use its OperationContext, so rather than
-// going through the trouble of making one, we'll just use nullptr throughout.
-OperationContext* opCtx = nullptr;
+class ClusterClientCursorImplTest : public unittest::Test {
+protected:
+ ServiceContextNoop _serviceContext;
+ ServiceContext::UniqueOperationContext _opCtx;
+ Client* _client;
+
+private:
+ void setUp() final {
+ auto client = _serviceContext.makeClient("testClient");
+ _opCtx = client->makeOperationContext();
+ _client = client.get();
+ Client::setCurrent(std::move(client));
+ }
+
+ void tearDown() final {
+ if (_opCtx) {
+ _opCtx.reset();
+ }
+
+ Client::releaseCurrent();
+ }
+};
-TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
- auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
+TEST_F(ClusterClientCursorImplTest, NumReturnedSoFar) {
+ auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get());
for (int i = 1; i < 10; ++i) {
mockStage->queueResult(BSON("a" << i));
}
- ClusterClientCursorImpl cursor(std::move(mockStage),
+ ClusterClientCursorImpl cursor(_opCtx.get(),
+ std::move(mockStage),
ClusterClientCursorParams(NamespaceString("unused"), {}),
boost::none);
@@ -69,12 +90,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL);
}
-TEST(ClusterClientCursorImpl, QueueResult) {
- auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
+TEST_F(ClusterClientCursorImplTest, QueueResult) {
+ auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get());
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 4));
- ClusterClientCursorImpl cursor(std::move(mockStage),
+ ClusterClientCursorImpl cursor(_opCtx.get(),
+ std::move(mockStage),
ClusterClientCursorParams(NamespaceString("unused"), {}),
boost::none);
@@ -108,13 +130,14 @@ TEST(ClusterClientCursorImpl, QueueResult) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 4LL);
}
-TEST(ClusterClientCursorImpl, RemotesExhausted) {
- auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
+TEST_F(ClusterClientCursorImplTest, RemotesExhausted) {
+ auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get());
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->markRemotesExhausted();
- ClusterClientCursorImpl cursor(std::move(mockStage),
+ ClusterClientCursorImpl cursor(_opCtx.get(),
+ std::move(mockStage),
ClusterClientCursorParams(NamespaceString("unused"), {}),
boost::none);
ASSERT_TRUE(cursor.remotesExhausted());
@@ -139,12 +162,13 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL);
}
-TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) {
- auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
+TEST_F(ClusterClientCursorImplTest, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get());
auto mockStagePtr = mockStage.get();
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
- ClusterClientCursorImpl cursor(std::move(mockStage),
+ ClusterClientCursorImpl cursor(_opCtx.get(),
+ std::move(mockStage),
ClusterClientCursorParams(NamespaceString("unused"), {}),
boost::none);
ASSERT_OK(cursor.setAwaitDataTimeout(Milliseconds(789)));
@@ -154,18 +178,47 @@ TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) {
ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
}
-TEST(ClusterClientCursorImpl, LogicalSessionIdsOnCursors) {
+TEST_F(ClusterClientCursorImplTest, ChecksForInterrupt) {
+ auto mockStage = stdx::make_unique<RouterStageMock>(nullptr);
+ for (int i = 1; i < 2; ++i) {
+ mockStage->queueResult(BSON("a" << i));
+ }
+
+ ClusterClientCursorImpl cursor(_opCtx.get(),
+ std::move(mockStage),
+ ClusterClientCursorParams(NamespaceString("unused"), {}),
+ boost::none);
+
+ // Pull one result out of the cursor.
+ auto result = cursor.next(RouterExecStage::ExecContext::kInitialFind);
+ ASSERT(result.isOK());
+ ASSERT_BSONOBJ_EQ(*result.getValue().getResult(), BSON("a" << 1));
+
+ // Now interrupt the opCtx which the cursor is running under.
+ {
+ stdx::lock_guard<Client> lk(*_client);
+ _opCtx->markKilled(ErrorCodes::CursorKilled);
+ }
+
+ // Now check that a subsequent call to next() will fail.
+ result = cursor.next(RouterExecStage::ExecContext::kInitialFind);
+ ASSERT_NOT_OK(result.getStatus());
+ ASSERT_EQ(result.getStatus(), ErrorCodes::CursorKilled);
+}
+
+TEST_F(ClusterClientCursorImplTest, LogicalSessionIdsOnCursors) {
// Make a cursor with no lsid
- auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
+ auto mockStage = stdx::make_unique<RouterStageMock>(_opCtx.get());
ClusterClientCursorParams params(NamespaceString("test"), {});
- ClusterClientCursorImpl cursor{std::move(mockStage), std::move(params), boost::none};
+ ClusterClientCursorImpl cursor{
+ _opCtx.get(), std::move(mockStage), std::move(params), boost::none};
ASSERT(!cursor.getLsid());
// Make a cursor with an lsid
- auto mockStage2 = stdx::make_unique<RouterStageMock>(opCtx);
+ auto mockStage2 = stdx::make_unique<RouterStageMock>(_opCtx.get());
ClusterClientCursorParams params2(NamespaceString("test"), {});
auto lsid = makeLogicalSessionIdForTest();
- ClusterClientCursorImpl cursor2{std::move(mockStage2), std::move(params2), lsid};
+ ClusterClientCursorImpl cursor2{_opCtx.get(), std::move(mockStage2), std::move(params2), lsid};
ASSERT(*(cursor2.getLsid()) == lsid);
}
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 92f6f76f29c..20a84582c7f 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -389,6 +389,13 @@ Status ClusterCursorManager::killCursor(const NamespaceString& nss, CursorId cur
return cursorNotFoundStatus(nss, cursorId);
}
+ // Interrupt any operation currently using the cursor.
+ OperationContext* opUsingCursor = entry->getOperationUsingCursor();
+ if (opUsingCursor) {
+ stdx::lock_guard<Client> lk(*opUsingCursor->getClient());
+ opUsingCursor->getServiceContext()->killOperation(opUsingCursor, ErrorCodes::CursorKilled);
+ }
+
entry->setKillPending();
return Status::OK();
diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp
index 626f585144b..0151800a799 100644
--- a/src/mongo/s/query/cluster_cursor_manager_test.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp
@@ -104,6 +104,29 @@ protected:
return *std::next(_cursorKilledFlags.begin(), i);
}
+ void killCursorFromDifferentOpCtx(const NamespaceString& nss, CursorId cursorId) {
+ // Set up another client to kill the cursor.
+ auto killCursorClientOwned = serviceContext.makeClient("killCursorClient");
+ // Keep around a raw pointer for when we transfer ownership of killingClientOwned to the
+ // global current client.
+ Client* killCursorClient = killCursorClientOwned.get();
+
+ // Need to swap the current client in order to make an operation context.
+ auto pinningClient = Client::releaseCurrent();
+ Client::setCurrent(std::move(killCursorClientOwned));
+
+ auto killCursorOpCtx = killCursorClient->makeOperationContext();
+ invariant(killCursorOpCtx);
+ ASSERT_OK(getManager()->killCursor(nss, cursorId));
+
+ // Restore the old client. We don't need 'killCursorClient' anymore.
+ killCursorOpCtx.reset();
+ Client::releaseCurrent();
+
+ Client::setCurrent(std::move(pinningClient));
+ }
+
+
private:
void setUp() final {
auto client = serviceContext.makeClient("testClient");
@@ -368,7 +391,11 @@ TEST_F(ClusterCursorManagerTest, KillCursorBasic) {
auto pinnedCursor =
getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId()));
+ killCursorFromDifferentOpCtx(nss, pinnedCursor.getValue().getCursorId());
+
+ // When the cursor is pinned the operation which checked out the cursor should be interrupted.
+ ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
+
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors(nullptr);
@@ -712,7 +739,10 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) {
auto pinnedCursor =
getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
- ASSERT_OK(getManager()->killCursor(nss, cursorId));
+
+ killCursorFromDifferentOpCtx(nss, cursorId);
+
+ ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
}
@@ -1016,7 +1046,10 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) {
auto pinnedCursor =
getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(getManager()->killCursor(nss, cursorId));
+
+ killCursorFromDifferentOpCtx(nss, cursorId);
+
+ ASSERT_EQ(_opCtx->checkForInterruptNoAssert(), ErrorCodes::CursorKilled);
ASSERT(!isMockCursorKilled(0));
// Pinned cursor should remain alive after reaping.
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 5690772dc2b..418419fdbef 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -141,6 +141,13 @@ public:
doDetachFromOperationContext();
}
+ /**
+ * Returns a pointer to the current OperationContext, or nullptr if there is no context.
+ */
+ OperationContext* getOpCtx() {
+ return _opCtx;
+ }
+
protected:
/**
* Performs any stage-specific reattach actions. Called after the OperationContext has been set
@@ -168,13 +175,6 @@ protected:
return _child.get();
}
- /**
- * Returns a pointer to the current OperationContext, or nullptr if there is no context.
- */
- OperationContext* getOpCtx() {
- return _opCtx;
- }
-
private:
OperationContext* _opCtx = nullptr;
std::unique_ptr<RouterExecStage> _child;