diff options
author | Alan Conway <aconway@apache.org> | 2011-02-08 20:28:16 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-02-08 20:28:16 +0000 |
commit | c2619b8aaf8fa95d08a58d2e6567070815b82f55 (patch) | |
tree | df6d3accb5f84ac3f7d249e87cec36fc4a2a6284 | |
parent | 706ef87511a99b1fe6f37e1b51fb152ec34c315e (diff) | |
download | qpid-python-c2619b8aaf8fa95d08a58d2e6567070815b82f55.tar.gz |
QPID-3045 - sporadic failure of cluster_tests.ShortTests.test_route_update
Sporadically the test was failing because the session associated with
an inter-broker bridge was created out of order with other
objects. This is unlikely to cause a fatal cluster inconsistency in
practice but it has been corrected in any case. The fix was to delay
creation of the management object for a bridge session till a point
which is consistent on all cluster members.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1068554 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 8 | ||||
-rw-r--r-- | cpp/src/tests/cluster_tests.fail | 2 |
4 files changed, 26 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 7106f85807..69b364ad7b 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -33,7 +33,7 @@ using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : amqp_0_10::SessionHandler(&c.getOutput(), ch), - connection(c), + connection(c), proxy(out), clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) {} @@ -69,7 +69,7 @@ void SessionHandler::handleDetach() { if (session.get()) connection.getBroker().getSessionManager().detach(session); assert(!session.get()); - connection.closeChannel(channel.get()); + connection.closeChannel(channel.get()); } void SessionHandler::setState(const std::string& name, bool force) { @@ -78,7 +78,7 @@ void SessionHandler::setState(const std::string& name, bool force) { session = connection.broker.getSessionManager().attach(*this, id, force); } -void SessionHandler::detaching() +void SessionHandler::detaching() { assert(session.get()); session->disableOutput(); @@ -98,7 +98,10 @@ void SessionHandler::attachAs(const std::string& name) { SessionId id(connection.getUserId(), name); SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); - session.reset(new SessionState(connection.getBroker(), *this, id, config)); + // Delay creating management object till attached(). In a cluster, + // only the active link broker calls attachAs but all brokers + // receive the subsequent attached() call. + session.reset(new SessionState(connection.getBroker(), *this, id, config, true)); sendAttach(false); } @@ -109,6 +112,7 @@ void SessionHandler::attachAs(const std::string& name) void SessionHandler::attached(const std::string& name) { if (session.get()) { + session->addManagementObject(); // Delayed from attachAs() amqp_0_10::SessionHandler::attached(name); } else { SessionId id(connection.getUserId(), name); @@ -117,5 +121,5 @@ void SessionHandler::attached(const std::string& name) markReadyToSend(); } } - + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 6f02399795..1ca7b6dfc1 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -53,7 +53,8 @@ using qpid::sys::AbsTime; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( - Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + Broker& b, SessionHandler& h, const SessionId& id, + const SessionState::Configuration& config, bool delayManagement) : qpid::SessionState(id, config), broker(b), handler(&h), semanticState(*this, *this), @@ -71,6 +72,12 @@ SessionState::SessionState( QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); } } + if (!delayManagement) addManagementObject(); + attach(h); +} + +void SessionState::addManagementObject() { + if (GetManagementObject()) return; // Already added. Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); @@ -80,11 +87,11 @@ SessionState::SessionState( mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); - if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate); + if (rateFlowcontrol) + mgmtObject->set_maxClientRate(rateFlowcontrol->getRate()); agent->addObject(mgmtObject); } } - attach(h); } SessionState::~SessionState() { diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 3dcb0a62d4..be79eb0eab 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -72,7 +72,8 @@ class SessionState : public qpid::SessionState, public framing::FrameHandler::InOutHandler { public: - SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&); + SessionState(Broker&, SessionHandler&, const SessionId&, + const SessionState::Configuration&, bool delayManagement=false); ~SessionState(); bool isAttached() const { return handler; } @@ -122,8 +123,11 @@ class SessionState : public qpid::SessionState, const SessionId& getSessionId() const { return getId(); } - private: + // Used to delay creation of management object for sessions + // belonging to inter-broker bridges + void addManagementObject(); + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); void enqueued(boost::intrusive_ptr<Message> msg); diff --git a/cpp/src/tests/cluster_tests.fail b/cpp/src/tests/cluster_tests.fail index 5117cd621d..b28b04f643 100644 --- a/cpp/src/tests/cluster_tests.fail +++ b/cpp/src/tests/cluster_tests.fail @@ -1,3 +1,3 @@ -cluster_tests.ShortTests.test_route_update + |