summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/lib/broker/Broker.cpp24
-rw-r--r--cpp/lib/broker/Broker.h12
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp18
-rw-r--r--cpp/lib/broker/BrokerAdapter.h6
-rw-r--r--cpp/lib/broker/Connection.cpp17
-rw-r--r--cpp/lib/broker/Connection.h12
-rw-r--r--cpp/tests/InProcessBroker.h11
7 files changed, 55 insertions, 45 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index b9e7990861..d3d68502f1 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -45,11 +45,8 @@ const std::string amq_topic("amq.topic");
const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
-Broker::Broker(const Configuration& config) :
- acceptor(Acceptor::create(config.getPort(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.isTrace())),
+Broker::Broker(const Configuration& conf) :
+ config(conf),
queues(store.get()),
timeout(30000),
stagingThreshold(0),
@@ -89,17 +86,30 @@ Broker::shared_ptr Broker::create(const Configuration& config) {
}
void Broker::run() {
- acceptor->run(&factory);
+ getAcceptor().run(&factory);
}
void Broker::shutdown() {
- acceptor->shutdown();
+ getAcceptor().shutdown();
}
Broker::~Broker() {
shutdown();
}
+int16_t Broker::getPort() const { return getAcceptor().getPort(); }
+
+Acceptor& Broker::getAcceptor() const {
+ if (!acceptor)
+ const_cast<Acceptor::shared_ptr&>(acceptor) =
+ Acceptor::create(config.getPort(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads(),
+ config.isTrace());
+ return *acceptor;
+}
+
+
const int16_t Broker::DEFAULT_PORT(5672);
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h
index ef5e7bbafb..27d2fec006 100644
--- a/cpp/lib/broker/Broker.h
+++ b/cpp/lib/broker/Broker.h
@@ -42,8 +42,8 @@ namespace broker {
/**
* A broker instance.
*/
-class Broker : public qpid::sys::Runnable,
- public qpid::SharedObject<Broker>
+class Broker : public sys::Runnable,
+ public SharedObject<Broker>
{
public:
static const int16_t DEFAULT_PORT;
@@ -67,7 +67,7 @@ class Broker : public qpid::sys::Runnable,
* port, which will be different if the configured port is
* 0.
*/
- virtual int16_t getPort() const { return acceptor->getPort(); }
+ virtual int16_t getPort() const;
/**
* Run the broker. Implements Runnable::run() so the broker
@@ -86,9 +86,11 @@ class Broker : public qpid::sys::Runnable,
AutoDelete& getCleaner() { return cleaner; }
private:
- Broker(const Configuration& config);
+ Broker(const Configuration& config);
+ sys::Acceptor& getAcceptor() const;
- qpid::sys::Acceptor::shared_ptr acceptor;
+ Configuration config;
+ sys::Acceptor::shared_ptr acceptor;
std::auto_ptr<MessageStore> store;
QueueRegistry queues;
ExchangeRegistry exchanges;
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index abf0b3852d..73ece8b264 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -183,14 +183,18 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(context, 100, connection.framemax, connection.heartbeat);
+ connection.client->getConnection().tune(
+ context, 100, connection.getFrameMax(), connection.getHeartbeat());
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
- connection.framemax = framemax;
- connection.heartbeat = heartbeat;
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(
+ const MethodContext&, u_int16_t /*channelmax*/,
+ u_int32_t framemax, u_int16_t heartbeat)
+{
+ connection.setFrameMax(framemax);
+ connection.setHeartbeat(heartbeat);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
@@ -496,14 +500,14 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
}
BrokerAdapter::BrokerAdapter(
- Channel* ch, Connection& c, Broker& b
+ std::auto_ptr<Channel> ch, Connection& c, Broker& b
) :
channel(ch),
connection(c),
broker(b),
- serverOps(new ServerOps(*ch,c,b))
+ serverOps(new ServerOps(*channel,c,b))
{
- init(ch->getId(), c.getOutput(), ch->getVersion());
+ init(channel->getId(), c.getOutput(), channel->getVersion());
}
void BrokerAdapter::handleMethodInContext(
diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h
index 339a7f6b48..b797576b39 100644
--- a/cpp/lib/broker/BrokerAdapter.h
+++ b/cpp/lib/broker/BrokerAdapter.h
@@ -18,6 +18,7 @@
* limitations under the License.
*
*/
+#include <memory.h>
#include "AMQP_ServerOperations.h"
#include "BodyHandler.h"
@@ -31,8 +32,6 @@ class AMQMethodBody;
class Connection;
class Broker;
-// FIXME aconway 2007-01-17: Rename to ChannelAdapter.
-
/**
* Per-channel protocol adapter.
*
@@ -44,8 +43,7 @@ class Broker;
class BrokerAdapter : public qpid::framing::ChannelAdapter
{
public:
- // FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel>
- BrokerAdapter(Channel* ch, Connection&, Broker&);
+ BrokerAdapter(std::auto_ptr<Channel> ch, Connection&, Broker&);
Channel& getChannel() { return *channel; }
void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index 0f58278a5a..5fcae39865 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -32,11 +32,11 @@ namespace qpid {
namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
- framemax(65536),
- heartbeat(0),
broker(broker_),
settings(broker.getTimeout(), broker.getStagingThreshold()),
- out(out_)
+ out(out_),
+ framemax(65536),
+ heartbeat(0)
{}
Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
@@ -106,11 +106,12 @@ void Connection::closeChannel(u_int16_t channel) {
BrokerAdapter& Connection::getAdapter(u_int16_t id) {
AdapterMap::iterator i = adapters.find(id);
if (i == adapters.end()) {
- Channel* ch=new Channel(
- client->getProtocolVersion(), out, id,
- framemax, broker.getQueues().getStore(),
- settings.stagingThreshold);
- BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
+ std::auto_ptr<Channel> ch(
+ new Channel(
+ client->getProtocolVersion(), out, id,
+ framemax, broker.getQueues().getStore(),
+ settings.stagingThreshold));
+ BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
adapters.insert(id, adapter);
return *adapter;
}
diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h
index d5d90d4830..08d14d795a 100644
--- a/cpp/lib/broker/Connection.h
+++ b/cpp/lib/broker/Connection.h
@@ -61,9 +61,12 @@ class Connection : public qpid::sys::ConnectionInputHandler,
qpid::sys::ConnectionOutputHandler& getOutput() { return *out; }
- // FIXME aconway 2007-01-16: encapsulate.
- u_int32_t framemax;
- u_int16_t heartbeat;
+ u_int32_t getFrameMax() const { return framemax; }
+ u_int16_t getHeartbeat() const { return heartbeat; }
+
+ void setFrameMax(u_int32_t fm) { framemax = fm; }
+ void setHeartbeat(u_int16_t hb) { heartbeat = hb; }
+
Broker& broker;
std::auto_ptr<qpid::framing::AMQP_ClientProxy> client;
Settings settings;
@@ -94,7 +97,8 @@ class Connection : public qpid::sys::ConnectionInputHandler,
AdapterMap adapters;
qpid::sys::ConnectionOutputHandler* out;
-
+ u_int32_t framemax;
+ u_int16_t heartbeat;
};
}}
diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h
index af0f4e84fe..cf2b9df8b0 100644
--- a/cpp/tests/InProcessBroker.h
+++ b/cpp/tests/InProcessBroker.h
@@ -49,16 +49,7 @@ framing::AMQFrame copy(framing::AMQFrame& from) {
*
* Also allows you to "snoop" on frames exchanged between client & broker.
*
- * Use as follows:
- *
- \code
- broker::InProcessBroker ibroker(version);
- client::Connection clientConnection;
- clientConnection.setConnector(ibroker);
- clientConnection.open("");
- ... use as normal
- \endcode
- *
+ * see FramingTest::testRequestResponseRoundtrip() for example of use.
*/
class InProcessBroker : public client::Connector {
public: