From 5028ba1a330f86f4f53fdeaa89d3564435086b29 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 22 Sep 2008 19:08:47 +0000 Subject: Fixed error handling session-busy condition on broker. Added accessors to iterate over broker::SemanticState consumers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697951 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 12 +++++---- cpp/src/qpid/amqp_0_10/SessionHandler.h | 1 + cpp/src/qpid/broker/SemanticState.h | 10 +++++++ cpp/src/qpid/broker/SessionState.h | 2 ++ cpp/src/qpid/cluster/DumpClient.cpp | 8 ++++++ cpp/src/qpid/cluster/DumpClient.h | 4 ++- cpp/src/qpid/sys/AggregateOutput.h | 9 ++++++- cpp/src/tests/cluster_test.cpp | 44 +++++++++++++++---------------- cpp/src/tests/exception_test.cpp | 8 ++++++ 9 files changed, 68 insertions(+), 30 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 8bf12d248a..c9bb57a13e 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -83,9 +83,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } catch(const ChannelException& e){ QPID_LOG(error, "Channel exception: " << e.what()); - if (getState()) - peer.detached(getState()->getId().getName(), e.code); - channelException(e.code, e.getMessage()); + peer.detached(name, e.code); } catch(const ConnectionException& e) { QPID_LOG(error, "Connection exception: " << e.what()); @@ -126,11 +124,15 @@ void SessionHandler::checkName(const std::string& name) { << ", expecting: " << getState()->getId().getName())); } -void SessionHandler::attach(const std::string& name, bool force) { +void SessionHandler::attach(const std::string& name_, bool force) { + // Save the name for possible session-busy exception. Session-busy + // can be thrown before we have attached the handler to a valid + // SessionState, and in that case we need the name to send peer.detached + name = name_; if (getState() && name == getState()->getId().getName()) return; // Idempotent if (getState()) - throw SessionBusyException( + throw TransportBusyException( QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId())); setState(name, force); QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId()); diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h index ccbe597bfc..684258bbae 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -106,6 +106,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, Peer peer; bool ignoring; bool sendReady, receiveReady; + std::string name; private: void sendCommandPoint(const SessionPoint&); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 94bd929adc..0c56885f8f 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -45,6 +45,7 @@ #include #include +#include namespace qpid { namespace broker { @@ -58,6 +59,7 @@ class SessionContext; class SemanticState : public sys::OutputTask, private boost::noncopyable { + public: class ConsumerImpl : public Consumer, public sys::OutputTask, public boost::enable_shared_from_this { @@ -106,8 +108,11 @@ class SemanticState : public sys::OutputTask, bool hasOutput(); bool doOutput(); + + std::string getName() const { return name; } }; + private: typedef std::map ConsumerImplMap; typedef std::map DtxBufferMap; @@ -190,6 +195,11 @@ class SemanticState : public sys::OutputTask, void attached(); void detached(); + + template void eachConsumer(const F& f) { + outputTasks.eachOutput( + boost::bind(f, boost::bind(&boost::polymorphic_downcast, _1))); + } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 5dd57d2299..bdef894f9f 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -100,6 +100,8 @@ class SessionState : public qpid::SessionState, void readyToSend(); + template void eachConsumer(const F& f) { semanticState.eachConsumer(f); } + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index c78859cc39..45ccec7166 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -186,8 +186,16 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { client::Session cs; client::SessionBase_0_10Access(cs).set(simpl); cs.sync(); + + broker::SessionState* ss = sh.getSession(); + ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1)); + // FIXME aconway 2008-09-19: remaining session state. QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId()); } +void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { + QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName()); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index 6cd382667a..6ce41a53a9 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -24,6 +24,7 @@ #include "qpid/client/Connection.h" #include "qpid/client/AsyncSession.h" +#include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" #include @@ -69,7 +70,8 @@ class DumpClient : public sys::Runnable { void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); void dumpConnection(const boost::intrusive_ptr& connection); void dumpSession(broker::SessionHandler& s); - + void dumpConsumer(broker::SemanticState::ConsumerImpl*); + private: Url receiver; Cluster& donor; diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h index 02a53ed50b..af26601f76 100644 --- a/cpp/src/qpid/sys/AggregateOutput.h +++ b/cpp/src/qpid/sys/AggregateOutput.h @@ -21,11 +21,13 @@ #ifndef _AggregateOutput_ #define _AggregateOutput_ -#include #include "Mutex.h" #include "OutputControl.h" #include "OutputTask.h" +#include +#include + namespace qpid { namespace sys { @@ -46,6 +48,11 @@ namespace sys { bool hasOutput(); void addOutputTask(OutputTask* t); void removeOutputTask(OutputTask* t); + + /** Apply f to each OutputTask* in the tasks list */ + template void eachOutput(const F& f) { + std::for_each(tasks.begin(), tasks.end(), f); + } }; } diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 60f85df02d..6bb5e4a8ca 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -46,7 +46,7 @@ Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster -QPID_AUTO_TEST_SUITE(CpgTestSuite) +QPID_AUTO_TEST_SUITE(cluster) using namespace std; using namespace qpid; @@ -147,8 +147,6 @@ void ClusterFixture::add0(bool init) { if (size()) front() = broker0->getPort(); else push_back(broker0->getPort()); } -// For debugging: op << for CPG types. - ostream& operator<<(ostream& o, const cpg_name* n) { return o << qpid::cluster::Cpg::str(*n); } @@ -166,35 +164,35 @@ ostream& operator<<(ostream& o, const pair& array) { return o; } -#if 0 // FIXME aconway 2008-09-10: finish & enable -QPID_AUTO_TEST_CASE(testDumpConsumers) { +#if 0 // FIXME aconway 2008-09-22: enable. +QPID_AUTO_TEST_CASE(DumpConsumers) { ClusterFixture cluster(1); - Client a(cluster[0]); - a.session.queueDeclare("q"); - a.subs.subscribe(a.lq, "q"); + Client c0(cluster[0]); + c0.session.queueDeclare("q"); + c0.subs.subscribe(c0.lq, "q"); + c0.session.messageTransfer(arg::content=Message("before", "q")); + Message m; + BOOST_CHECK(c0.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "before"); + // Start new member cluster.add(); - Client b(cluster[1]); - try { - b.connection.newSession(a.session.getId().getName()); - BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); - } catch (const SessionBusyException&) {} + Client c1(cluster[1]); - // Transfer some messages to the subscription by client a. - Message m; - a.session.messageTransfer(arg::content=Message("aaa", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); + // Transfer some messages to the subscription by client c0. + c0.session.messageTransfer(arg::content=Message("aaa", "q")); + BOOST_CHECK(c0.lq.get(m, TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "aaa"); - b.session.messageTransfer(arg::content=Message("bbb", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); + c1.session.messageTransfer(arg::content=Message("bbb", "q")); + BOOST_CHECK(c0.lq.get(m, TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "bbb"); // Verify that the queue has been drained on both brokers. // This proves that the consumer was replicated when the second broker joined. - BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), (unsigned)0); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); } - #endif QPID_AUTO_TEST_CASE(testCatchupSharedState) { @@ -218,8 +216,8 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { // Do some work post-join cluster.waitFor(2); c0.session.messageTransfer(arg::content=Message("pbar","p")); - - // Verify new broker has all state. + + // Verify new brokers have all state. Message m; Client c1(cluster[1], "c1"); diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index 1cbe35fff4..339881fa9d 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -73,6 +73,14 @@ struct Catcher : public Runnable { } }; +QPID_AUTO_TEST_CASE(TestSessionBusy) { + SessionFixture f; + try { + f.connection.newSession(f.session.getId().getName()); + BOOST_FAIL("Expected SessionBusyException for " << f.session.getId().getName()); + } catch (const Exception&) {} // FIXME aconway 2008-09-22: client is not throwing correct exception. +} + QPID_AUTO_TEST_CASE(DisconnectedPop) { ProxySessionFixture fix; ProxyConnection c(fix.broker->getPort()); -- cgit v1.2.1