diff options
-rw-r--r-- | src/mongo/db/operation_context.h | 7 | ||||
-rw-r--r-- | src/mongo/db/operation_context_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/operation_context_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/operation_context_noop.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.h | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl_test.cpp | 99 |
9 files changed, 162 insertions, 16 deletions
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index c452f3d777b..e425b6941ab 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -114,6 +114,13 @@ namespace mongo { virtual CurOp* getCurOp() const = 0; /** + * Returns the operation ID associated with this operation. + * WARNING: Due to SERVER-14995, this OpID is not guaranteed to stay the same for the + * lifetime of this OperationContext. + */ + virtual unsigned int getOpID() const = 0; + + /** * @return true if this instance is primary for this namespace */ virtual bool isPrimaryFor( const StringData& ns ) = 0; diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index fed76022dbb..07fef4bd3d9 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -83,6 +83,10 @@ namespace mongo { return cc().curop(); } + unsigned int OperationContextImpl::getOpID() const { + return getCurOp()->opNum(); + } + // Enabling the checkForInterruptFail fail point will start a game of random chance on the // connection specified in the fail point data, generating an interrupt with a given fixed // probability. Example invocation: @@ -153,11 +157,12 @@ namespace mongo { } if (c.curop()->killPending()) { - uasserted(11601, "operation was interrupted"); + uasserted(ErrorCodes::Interrupted, "operation was interrupted"); } } Status OperationContextImpl::checkForInterruptNoAssert() const { + // TODO(spencer): Unify error codes and implementation with checkForInterrupt() Client& c = cc(); if (getGlobalEnvironment()->getKillAllOperations()) { diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h index d154b70dc75..95e09e7f1c1 100644 --- a/src/mongo/db/operation_context_impl.h +++ b/src/mongo/db/operation_context_impl.h @@ -58,6 +58,8 @@ namespace mongo { virtual CurOp* getCurOp() const; + virtual unsigned int getOpID() const; + virtual void checkForInterrupt(bool heedMutex = true) const; virtual Status checkForInterruptNoAssert() const; diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h index 28b4fad8972..7f47193a78f 100644 --- a/src/mongo/db/operation_context_noop.h +++ b/src/mongo/db/operation_context_noop.h @@ -93,6 +93,10 @@ namespace mongo { return string(); }; + virtual unsigned int getOpID() const { + return 0; + } + virtual Transaction* getTransaction() { return NULL; } diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 2cf00a9b76b..77a52d751aa 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -53,9 +53,10 @@ env.Library('repl_coordinator_impl', 'replica_set_config_checks.cpp', ], LIBDEPS=['$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/index/index_descriptor', '$BUILD_DIR/mongo/fail_point', '$BUILD_DIR/mongo/foundation', - '$BUILD_DIR/mongo/db/index/index_descriptor', + '$BUILD_DIR/mongo/global_environment_experiment', '$BUILD_DIR/mongo/server_options_core', 'repl_coordinator_interface', 'replica_set_messages', diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp index 2d8dc81307a..472448f1507 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp +++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp @@ -32,6 +32,7 @@ #include "mongo/db/repl/repl_coordinator_hybrid.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/network_interface_impl.h" #include "mongo/db/repl/repl_coordinator_external_state_impl.h" @@ -50,6 +51,7 @@ namespace repl { new ReplicationCoordinatorExternalStateImpl, new NetworkInterfaceImpl, new TopologyCoordinatorImpl(Seconds(maxSyncSourceLagSecs))) { + getGlobalEnvironment()->registerKillOpListener(&_impl); } HybridReplicationCoordinator::~HybridReplicationCoordinator() {} diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 384d1e62d19..97f632ea4ff 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -78,9 +78,11 @@ namespace { * in the destructor. */ WaiterInfo(std::vector<WaiterInfo*>* _list, + unsigned int _opID, const OpTime* _opTime, const WriteConcernOptions* _writeConcern, boost::condition_variable* _condVar) : list(_list), + opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) { @@ -92,6 +94,7 @@ namespace { } std::vector<WaiterInfo*>* list; + const unsigned int opID; const OpTime* opTime; const WriteConcernOptions* writeConcern; boost::condition_variable* condVar; @@ -397,9 +400,30 @@ namespace { return false; } + void ReplicationCoordinatorImpl::interrupt(unsigned opId) { + boost::lock_guard<boost::mutex> lk(_mutex); + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); ++it) { + WaiterInfo* info = *it; + if (info->opID == opId) { + info->condVar->notify_all(); + return; + } + } + } + + void ReplicationCoordinatorImpl::interruptAll() { + boost::lock_guard<boost::mutex> lk(_mutex); + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); ++it) { + WaiterInfo* info = *it; + info->condVar->notify_all(); + } + } + ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication( const OperationContext* txn, - const OpTime& opId, + const OpTime& opTime, const WriteConcernOptions& writeConcern) { // TODO(spencer): handle killop @@ -425,10 +449,18 @@ namespace { } // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList - WaiterInfo waitInfo(&_replicationWaiterList, &opId, &writeConcern, &condVar); + WaiterInfo waitInfo( + &_replicationWaiterList, txn->getOpID(), &opTime, &writeConcern, &condVar); - while (!_opReplicatedEnough_inlock(opId, writeConcern)) { + while (!_opReplicatedEnough_inlock(opTime, writeConcern)) { const int elapsed = timer.millis(); + + try { + txn->checkForInterrupt(); + } catch (const DBException& e) { + return StatusAndDuration(e.toStatus(), Milliseconds(elapsed)); + } + if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout && elapsed > writeConcern.wTimeout) { return StatusAndDuration(Status(ErrorCodes::ExceededTimeLimit, diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index 5b08198a4c7..b906dc1859d 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -36,6 +36,7 @@ #include "mongo/base/status.h" #include "mongo/bson/optime.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/repl_coordinator.h" #include "mongo/db/repl/repl_coordinator_external_state.h" @@ -50,7 +51,8 @@ namespace repl { class SyncSourceFeedback; class TopologyCoordinator; - class ReplicationCoordinatorImpl : public ReplicationCoordinator { + class ReplicationCoordinatorImpl : public ReplicationCoordinator, + public KillOpListenerInterface { MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl); public: @@ -74,6 +76,18 @@ namespace repl { virtual MemberState getCurrentMemberState() const; + /* + * Implementation of the KillOpListenerInterface interrupt method so that we can wake up + * threads blocked in awaitReplication() when a killOp command comes in. + */ + virtual void interrupt(unsigned opId); + + /* + * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up + * threads blocked in awaitReplication() when we kill all operations. + */ + virtual void interruptAll(); + virtual ReplicationCoordinator::StatusAndDuration awaitReplication( const OperationContext* txn, const OpTime& ts, diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp index ebab42fefe6..60297238be2 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp @@ -564,10 +564,11 @@ namespace { return _result; } - void start() { + void start(OperationContext* txn) { ASSERT(!_finished); _thread.reset(new boost::thread(stdx::bind(&ReplicationAwaiter::_awaitReplication, - this))); + this, + txn))); } void reset() { @@ -579,9 +580,8 @@ namespace { private: - void _awaitReplication() { - OperationContextNoop txn; - _result = _replCoord->awaitReplication(&txn, _optime, _writeConcern); + void _awaitReplication(OperationContext* txn) { + _result = _replCoord->awaitReplication(txn, _optime, _writeConcern); _finished = true; } @@ -616,7 +616,7 @@ namespace { // 2 nodes waiting for time1 awaiter.setOpTime(time1); awaiter.setWriteConcern(writeConcern); - awaiter.start(); + awaiter.start(&txn); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1)); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1)); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); @@ -625,7 +625,7 @@ namespace { // 2 nodes waiting for time2 awaiter.setOpTime(time2); - awaiter.start(); + awaiter.start(&txn); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time2)); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client3, time2)); statusAndDur = awaiter.getResult(); @@ -635,7 +635,7 @@ namespace { // 3 nodes waiting for time2 writeConcern.wNumNodes = 3; awaiter.setWriteConcern(writeConcern); - awaiter.start(); + awaiter.start(&txn); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time2)); statusAndDur = awaiter.getResult(); ASSERT_OK(statusAndDur.status); @@ -663,7 +663,7 @@ namespace { // 2 nodes waiting for time2 awaiter.setOpTime(time2); awaiter.setWriteConcern(writeConcern); - awaiter.start(); + awaiter.start(&txn); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1)); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1)); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); @@ -692,7 +692,7 @@ namespace { // 2 nodes waiting for time2 awaiter.setOpTime(time2); awaiter.setWriteConcern(writeConcern); - awaiter.start(); + awaiter.start(&txn); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1)); ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1)); shutdown(); @@ -701,6 +701,85 @@ namespace { awaiter.reset(); } + class OperationContextNoopWithInterrupt : public OperationContextNoop { + public: + + OperationContextNoopWithInterrupt() : _opID(0), _interruptOp(false) {} + + virtual unsigned int getOpID() const { + return _opID; + } + + /** + * Can only be called before any multi-threaded access to this object has begun. + */ + void setOpID(unsigned int opID) { + _opID = opID; + } + + virtual void checkForInterrupt(bool heedMutex = true) const { + if (_interruptOp) { + uasserted(ErrorCodes::Interrupted, "operation was interrupted"); + } + } + + /** + * Can only be called before any multi-threaded access to this object has begun. + */ + void setInterruptOp(bool interrupt) { + _interruptOp = interrupt; + } + + private: + unsigned int _opID; + bool _interruptOp; + }; + + TEST_F(ReplCoordTest, AwaitReplicationInterrupt) { + // Tests that a thread blocked in awaitReplication can be killed by a killOp operation + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "node1") << + BSON("_id" << 1 << "host" << "node2") << + BSON("_id" << 2 << "host" << "node3"))), + HostAndPort("node1")); + OperationContextNoopWithInterrupt txn; + ReplicationAwaiter awaiter(getReplCoord(), &txn); + + OID client1 = OID::gen(); + OID client2 = OID::gen(); + OpTime time1(1, 1); + OpTime time2(1, 2); + + HandshakeArgs handshake1; + ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1)); + HandshakeArgs handshake2; + ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2)); + + WriteConcernOptions writeConcern; + writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; + writeConcern.wNumNodes = 2; + + unsigned int opID = 100; + txn.setOpID(opID); + + // 2 nodes waiting for time2 + awaiter.setOpTime(time2); + awaiter.setWriteConcern(writeConcern); + awaiter.start(&txn); + ASSERT_OK(getReplCoord()->setLastOptime(&txn, client1, time1)); + ASSERT_OK(getReplCoord()->setLastOptime(&txn, client2, time1)); + + txn.setInterruptOp(true); + getReplCoord()->interrupt(opID); + ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); + ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status); + awaiter.reset(); + } + TEST_F(ReplCoordTest, AwaitReplicationNamedModes) { // TODO(spencer): Test awaitReplication with w:majority and tag groups warning() << "Test ReplCoordTest.AwaitReplicationNamedModes needs to be written."; |