summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp12
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.h1
-rw-r--r--cpp/src/qpid/broker/SemanticState.h10
-rw-r--r--cpp/src/qpid/broker/SessionState.h2
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp8
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h4
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.h9
-rw-r--r--cpp/src/tests/cluster_test.cpp44
-rw-r--r--cpp/src/tests/exception_test.cpp8
9 files changed, 68 insertions, 30 deletions
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 8bf12d248a..c9bb57a13e 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -83,9 +83,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
- if (getState())
- peer.detached(getState()->getId().getName(), e.code);
- channelException(e.code, e.getMessage());
+ peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
QPID_LOG(error, "Connection exception: " << e.what());
@@ -126,11 +124,15 @@ void SessionHandler::checkName(const std::string& name) {
<< ", expecting: " << getState()->getId().getName()));
}
-void SessionHandler::attach(const std::string& name, bool force) {
+void SessionHandler::attach(const std::string& name_, bool force) {
+ // Save the name for possible session-busy exception. Session-busy
+ // can be thrown before we have attached the handler to a valid
+ // SessionState, and in that case we need the name to send peer.detached
+ name = name_;
if (getState() && name == getState()->getId().getName())
return; // Idempotent
if (getState())
- throw SessionBusyException(
+ throw TransportBusyException(
QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId()));
setState(name, force);
QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId());
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h
index ccbe597bfc..684258bbae 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -106,6 +106,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
Peer peer;
bool ignoring;
bool sendReady, receiveReady;
+ std::string name;
private:
void sendCommandPoint(const SessionPoint&);
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 94bd929adc..0c56885f8f 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -45,6 +45,7 @@
#include <vector>
#include <boost/intrusive_ptr.hpp>
+#include <boost/cast.hpp>
namespace qpid {
namespace broker {
@@ -58,6 +59,7 @@ class SessionContext;
class SemanticState : public sys::OutputTask,
private boost::noncopyable
{
+ public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>
{
@@ -106,8 +108,11 @@ class SemanticState : public sys::OutputTask,
bool hasOutput();
bool doOutput();
+
+ std::string getName() const { return name; }
};
+ private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
@@ -190,6 +195,11 @@ class SemanticState : public sys::OutputTask,
void attached();
void detached();
+
+ template <class F> void eachConsumer(const F& f) {
+ outputTasks.eachOutput(
+ boost::bind(f, boost::bind(&boost::polymorphic_downcast<ConsumerImpl*, OutputTask>, _1)));
+ }
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 5dd57d2299..bdef894f9f 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -100,6 +100,8 @@ class SessionState : public qpid::SessionState,
void readyToSend();
+ template <class F> void eachConsumer(const F& f) { semanticState.eachConsumer(f); }
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index c78859cc39..45ccec7166 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -186,8 +186,16 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
client::Session cs;
client::SessionBase_0_10Access(cs).set(simpl);
cs.sync();
+
+ broker::SessionState* ss = sh.getSession();
+ ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1));
+
// FIXME aconway 2008-09-19: remaining session state.
QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
}
+void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+ QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName());
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index 6cd382667a..6ce41a53a9 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -24,6 +24,7 @@
#include "qpid/client/Connection.h"
#include "qpid/client/AsyncSession.h"
+#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
@@ -69,7 +70,8 @@ class DumpClient : public sys::Runnable {
void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
void dumpSession(broker::SessionHandler& s);
-
+ void dumpConsumer(broker::SemanticState::ConsumerImpl*);
+
private:
Url receiver;
Cluster& donor;
diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h
index 02a53ed50b..af26601f76 100644
--- a/cpp/src/qpid/sys/AggregateOutput.h
+++ b/cpp/src/qpid/sys/AggregateOutput.h
@@ -21,11 +21,13 @@
#ifndef _AggregateOutput_
#define _AggregateOutput_
-#include <vector>
#include "Mutex.h"
#include "OutputControl.h"
#include "OutputTask.h"
+#include <algorithm>
+#include <vector>
+
namespace qpid {
namespace sys {
@@ -46,6 +48,11 @@ namespace sys {
bool hasOutput();
void addOutputTask(OutputTask* t);
void removeOutputTask(OutputTask* t);
+
+ /** Apply f to each OutputTask* in the tasks list */
+ template <class F> void eachOutput(const F& f) {
+ std::for_each(tasks.begin(), tasks.end(), f);
+ }
};
}
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 60f85df02d..6bb5e4a8ca 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -46,7 +46,7 @@ Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
}} // namespace qpid::cluster
-QPID_AUTO_TEST_SUITE(CpgTestSuite)
+QPID_AUTO_TEST_SUITE(cluster)
using namespace std;
using namespace qpid;
@@ -147,8 +147,6 @@ void ClusterFixture::add0(bool init) {
if (size()) front() = broker0->getPort(); else push_back(broker0->getPort());
}
-// For debugging: op << for CPG types.
-
ostream& operator<<(ostream& o, const cpg_name* n) {
return o << qpid::cluster::Cpg::str(*n);
}
@@ -166,35 +164,35 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-#if 0 // FIXME aconway 2008-09-10: finish & enable
-QPID_AUTO_TEST_CASE(testDumpConsumers) {
+#if 0 // FIXME aconway 2008-09-22: enable.
+QPID_AUTO_TEST_CASE(DumpConsumers) {
ClusterFixture cluster(1);
- Client a(cluster[0]);
- a.session.queueDeclare("q");
- a.subs.subscribe(a.lq, "q");
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q");
+ c0.subs.subscribe(c0.lq, "q");
+ c0.session.messageTransfer(arg::content=Message("before", "q"));
+ Message m;
+ BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "before");
+ // Start new member
cluster.add();
- Client b(cluster[1]);
- try {
- b.connection.newSession(a.session.getId().getName());
- BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
- } catch (const SessionBusyException&) {}
+ Client c1(cluster[1]);
- // Transfer some messages to the subscription by client a.
- Message m;
- a.session.messageTransfer(arg::content=Message("aaa", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ // Transfer some messages to the subscription by client c0.
+ c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+ BOOST_CHECK(c0.lq.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "aaa");
- b.session.messageTransfer(arg::content=Message("bbb", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ c1.session.messageTransfer(arg::content=Message("bbb", "q"));
+ BOOST_CHECK(c0.lq.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "bbb");
// Verify that the queue has been drained on both brokers.
// This proves that the consumer was replicated when the second broker joined.
- BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), (unsigned)0);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
}
-
#endif
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
@@ -218,8 +216,8 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
// Do some work post-join
cluster.waitFor(2);
c0.session.messageTransfer(arg::content=Message("pbar","p"));
-
- // Verify new broker has all state.
+
+ // Verify new brokers have all state.
Message m;
Client c1(cluster[1], "c1");
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index 1cbe35fff4..339881fa9d 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -73,6 +73,14 @@ struct Catcher : public Runnable {
}
};
+QPID_AUTO_TEST_CASE(TestSessionBusy) {
+ SessionFixture f;
+ try {
+ f.connection.newSession(f.session.getId().getName());
+ BOOST_FAIL("Expected SessionBusyException for " << f.session.getId().getName());
+ } catch (const Exception&) {} // FIXME aconway 2008-09-22: client is not throwing correct exception.
+}
+
QPID_AUTO_TEST_CASE(DisconnectedPop) {
ProxySessionFixture fix;
ProxyConnection c(fix.broker->getPort());