summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2014-08-21 19:16:47 -0400
committerSpencer T Brody <spencer@mongodb.com>2014-08-25 17:14:59 -0400
commit4e2b90f5d94b2b7194a67f6e7ca766f42c96eddd (patch)
tree9d649192a9f8064104f4fe7e0ac15e59bcbf567d
parent052175eb9e0b793de19575fea37a7fdd95126a50 (diff)
downloadmongo-4e2b90f5d94b2b7194a67f6e7ca766f42c96eddd.tar.gz
SERVER-14371 Implement killOp support to interrupt threads blocked in awaitReplication
-rw-r--r--src/mongo/db/operation_context.h7
-rw-r--r--src/mongo/db/operation_context_impl.cpp7
-rw-r--r--src/mongo/db/operation_context_impl.h2
-rw-r--r--src/mongo/db/operation_context_noop.h4
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp2
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp38
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h16
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl_test.cpp99
9 files changed, 162 insertions, 16 deletions
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index c452f3d777b..e425b6941ab 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -114,6 +114,13 @@ namespace mongo {
virtual CurOp* getCurOp() const = 0;
/**
+ * Returns the operation ID associated with this operation.
+ * WARNING: Due to SERVER-14995, this OpID is not guaranteed to stay the same for the
+ * lifetime of this OperationContext.
+ */
+ virtual unsigned int getOpID() const = 0;
+
+ /**
* @return true if this instance is primary for this namespace
*/
virtual bool isPrimaryFor( const StringData& ns ) = 0;
diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp
index fed76022dbb..07fef4bd3d9 100644
--- a/src/mongo/db/operation_context_impl.cpp
+++ b/src/mongo/db/operation_context_impl.cpp
@@ -83,6 +83,10 @@ namespace mongo {
return cc().curop();
}
+ unsigned int OperationContextImpl::getOpID() const {
+ return getCurOp()->opNum();
+ }
+
// Enabling the checkForInterruptFail fail point will start a game of random chance on the
// connection specified in the fail point data, generating an interrupt with a given fixed
// probability. Example invocation:
@@ -153,11 +157,12 @@ namespace mongo {
}
if (c.curop()->killPending()) {
- uasserted(11601, "operation was interrupted");
+ uasserted(ErrorCodes::Interrupted, "operation was interrupted");
}
}
Status OperationContextImpl::checkForInterruptNoAssert() const {
+ // TODO(spencer): Unify error codes and implementation with checkForInterrupt()
Client& c = cc();
if (getGlobalEnvironment()->getKillAllOperations()) {
diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h
index d154b70dc75..95e09e7f1c1 100644
--- a/src/mongo/db/operation_context_impl.h
+++ b/src/mongo/db/operation_context_impl.h
@@ -58,6 +58,8 @@ namespace mongo {
virtual CurOp* getCurOp() const;
+ virtual unsigned int getOpID() const;
+
virtual void checkForInterrupt(bool heedMutex = true) const;
virtual Status checkForInterruptNoAssert() const;
diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h
index 28b4fad8972..7f47193a78f 100644
--- a/src/mongo/db/operation_context_noop.h
+++ b/src/mongo/db/operation_context_noop.h
@@ -93,6 +93,10 @@ namespace mongo {
return string();
};
+ virtual unsigned int getOpID() const {
+ return 0;
+ }
+
virtual Transaction* getTransaction() {
return NULL;
}
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 2cf00a9b76b..77a52d751aa 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -53,9 +53,10 @@ env.Library('repl_coordinator_impl',
'replica_set_config_checks.cpp',
],
LIBDEPS=['$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/db/index/index_descriptor',
'$BUILD_DIR/mongo/fail_point',
'$BUILD_DIR/mongo/foundation',
- '$BUILD_DIR/mongo/db/index/index_descriptor',
+ '$BUILD_DIR/mongo/global_environment_experiment',
'$BUILD_DIR/mongo/server_options_core',
'repl_coordinator_interface',
'replica_set_messages',
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
index 2d8dc81307a..472448f1507 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/repl/repl_coordinator_hybrid.h"
+#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/network_interface_impl.h"
#include "mongo/db/repl/repl_coordinator_external_state_impl.h"
@@ -50,6 +51,7 @@ namespace repl {
new ReplicationCoordinatorExternalStateImpl,
new NetworkInterfaceImpl,
new TopologyCoordinatorImpl(Seconds(maxSyncSourceLagSecs))) {
+ getGlobalEnvironment()->registerKillOpListener(&_impl);
}
HybridReplicationCoordinator::~HybridReplicationCoordinator() {}
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 384d1e62d19..97f632ea4ff 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -78,9 +78,11 @@ namespace {
* in the destructor.
*/
WaiterInfo(std::vector<WaiterInfo*>* _list,
+ unsigned int _opID,
const OpTime* _opTime,
const WriteConcernOptions* _writeConcern,
boost::condition_variable* _condVar) : list(_list),
+ opID(_opID),
opTime(_opTime),
writeConcern(_writeConcern),
condVar(_condVar) {
@@ -92,6 +94,7 @@ namespace {
}
std::vector<WaiterInfo*>* list;
+ const unsigned int opID;
const OpTime* opTime;
const WriteConcernOptions* writeConcern;
boost::condition_variable* condVar;
@@ -397,9 +400,30 @@ namespace {
return false;
}
+ void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
+ it != _replicationWaiterList.end(); ++it) {
+ WaiterInfo* info = *it;
+ if (info->opID == opId) {
+ info->condVar->notify_all();
+ return;
+ }
+ }
+ }
+
+ void ReplicationCoordinatorImpl::interruptAll() {
+ boost::lock_guard<boost::mutex> lk(_mutex);
+ for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin();
+ it != _replicationWaiterList.end(); ++it) {
+ WaiterInfo* info = *it;
+ info->condVar->notify_all();
+ }
+ }
+
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
const OperationContext* txn,
- const OpTime& opId,
+ const OpTime& opTime,
const WriteConcernOptions& writeConcern) {
// TODO(spencer): handle killop
@@ -425,10 +449,18 @@ namespace {
}
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
- WaiterInfo waitInfo(&_replicationWaiterList, &opId, &writeConcern, &condVar);
+ WaiterInfo waitInfo(
+ &_replicationWaiterList, txn->getOpID(), &opTime, &writeConcern, &condVar);
- while (!_opReplicatedEnough_inlock(opId, writeConcern)) {
+ while (!_opReplicatedEnough_inlock(opTime, writeConcern)) {
const int elapsed = timer.millis();
+
+ try {
+ txn->checkForInterrupt();
+ } catch (const DBException& e) {
+ return StatusAndDuration(e.toStatus(), Milliseconds(elapsed));
+ }
+
if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout &&
elapsed > writeConcern.wTimeout) {
return StatusAndDuration(Status(ErrorCodes::ExceededTimeLimit,
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index 5b08198a4c7..b906dc1859d 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -36,6 +36,7 @@
#include "mongo/base/status.h"
#include "mongo/bson/optime.h"
+#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/repl_coordinator.h"
#include "mongo/db/repl/repl_coordinator_external_state.h"
@@ -50,7 +51,8 @@ namespace repl {
class SyncSourceFeedback;
class TopologyCoordinator;
- class ReplicationCoordinatorImpl : public ReplicationCoordinator {
+ class ReplicationCoordinatorImpl : public ReplicationCoordinator,
+ public KillOpListenerInterface {
MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl);
public:
@@ -74,6 +76,18 @@ namespace repl {
virtual MemberState getCurrentMemberState() const;
+ /*
+ * Implementation of the KillOpListenerInterface interrupt method so that we can wake up
+ * threads blocked in awaitReplication() when a killOp command comes in.
+ */
+ virtual void interrupt(unsigned opId);
+
+ /*
+ * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up
+ * threads blocked in awaitReplication() when we kill all operations.
+ */
+ virtual void interruptAll();
+
virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
const OperationContext* txn,
const OpTime& ts,
diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
index ebab42fefe6..60297238be2 100644
--- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
@@ -564,10 +564,11 @@ namespace {
return _result;
}
- void start() {
+ void start(OperationContext* txn) {
ASSERT(!_finished);
_thread.reset(new boost::thread(stdx::bind(&ReplicationAwaiter::_awaitReplication,
- this)));
+ this,
+ txn)));
}
void reset() {
@@ -579,9 +580,8 @@ namespace {
private:
- void _awaitReplication() {
- OperationContextNoop txn;
- _result = _replCoord->awaitReplication(&txn, _optime, _writeConcern);
+ void _awaitReplication(OperationContext* txn) {
+ _result = _replCoord->awaitReplication(txn, _optime, _writeConcern);
_finished = true;
}
@@ -616,7 +616,7 @@ namespace {
// 2 nodes waiting for time1
awaiter.setOpTime(time1);
awaiter.setWriteConcern(writeConcern);
- awaiter.start();
+ awaiter.start(&txn);
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1));
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1));
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
@@ -625,7 +625,7 @@ namespace {
// 2 nodes waiting for time2
awaiter.setOpTime(time2);
- awaiter.start();
+ awaiter.start(&txn);
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time2));
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client3, time2));
statusAndDur = awaiter.getResult();
@@ -635,7 +635,7 @@ namespace {
// 3 nodes waiting for time2
writeConcern.wNumNodes = 3;
awaiter.setWriteConcern(writeConcern);
- awaiter.start();
+ awaiter.start(&txn);
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time2));
statusAndDur = awaiter.getResult();
ASSERT_OK(statusAndDur.status);
@@ -663,7 +663,7 @@ namespace {
// 2 nodes waiting for time2
awaiter.setOpTime(time2);
awaiter.setWriteConcern(writeConcern);
- awaiter.start();
+ awaiter.start(&txn);
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1));
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1));
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
@@ -692,7 +692,7 @@ namespace {
// 2 nodes waiting for time2
awaiter.setOpTime(time2);
awaiter.setWriteConcern(writeConcern);
- awaiter.start();
+ awaiter.start(&txn);
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1));
ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1));
shutdown();
@@ -701,6 +701,85 @@ namespace {
awaiter.reset();
}
+ class OperationContextNoopWithInterrupt : public OperationContextNoop {
+ public:
+
+ OperationContextNoopWithInterrupt() : _opID(0), _interruptOp(false) {}
+
+ virtual unsigned int getOpID() const {
+ return _opID;
+ }
+
+ /**
+ * Can only be called before any multi-threaded access to this object has begun.
+ */
+ void setOpID(unsigned int opID) {
+ _opID = opID;
+ }
+
+ virtual void checkForInterrupt(bool heedMutex = true) const {
+ if (_interruptOp) {
+ uasserted(ErrorCodes::Interrupted, "operation was interrupted");
+ }
+ }
+
+ /**
+ * Can only be called before any multi-threaded access to this object has begun.
+ */
+ void setInterruptOp(bool interrupt) {
+ _interruptOp = interrupt;
+ }
+
+ private:
+ unsigned int _opID;
+ bool _interruptOp;
+ };
+
+ TEST_F(ReplCoordTest, AwaitReplicationInterrupt) {
+ // Tests that a thread blocked in awaitReplication can be killed by a killOp operation
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "node1") <<
+ BSON("_id" << 1 << "host" << "node2") <<
+ BSON("_id" << 2 << "host" << "node3"))),
+ HostAndPort("node1"));
+ OperationContextNoopWithInterrupt txn;
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+
+ OID client1 = OID::gen();
+ OID client2 = OID::gen();
+ OpTime time1(1, 1);
+ OpTime time2(1, 2);
+
+ HandshakeArgs handshake1;
+ ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1));
+ HandshakeArgs handshake2;
+ ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2));
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wNumNodes = 2;
+
+ unsigned int opID = 100;
+ txn.setOpID(opID);
+
+ // 2 nodes waiting for time2
+ awaiter.setOpTime(time2);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+ ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1));
+ ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1));
+
+ txn.setInterruptOp(true);
+ getReplCoord()->interrupt(opID);
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status);
+ awaiter.reset();
+ }
+
TEST_F(ReplCoordTest, AwaitReplicationNamedModes) {
// TODO(spencer): Test awaitReplication with w:majority and tag groups
warning() << "Test ReplCoordTest.AwaitReplicationNamedModes needs to be written.";