diff options
-rw-r--r-- | cpp/lib/broker/Broker.cpp | 24 | ||||
-rw-r--r-- | cpp/lib/broker/Broker.h | 12 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 18 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.h | 6 | ||||
-rw-r--r-- | cpp/lib/broker/Connection.cpp | 17 | ||||
-rw-r--r-- | cpp/lib/broker/Connection.h | 12 | ||||
-rw-r--r-- | cpp/tests/InProcessBroker.h | 11 |
7 files changed, 55 insertions, 45 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index b9e7990861..d3d68502f1 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -45,11 +45,8 @@ const std::string amq_topic("amq.topic"); const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); -Broker::Broker(const Configuration& config) : - acceptor(Acceptor::create(config.getPort(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.isTrace())), +Broker::Broker(const Configuration& conf) : + config(conf), queues(store.get()), timeout(30000), stagingThreshold(0), @@ -89,17 +86,30 @@ Broker::shared_ptr Broker::create(const Configuration& config) { } void Broker::run() { - acceptor->run(&factory); + getAcceptor().run(&factory); } void Broker::shutdown() { - acceptor->shutdown(); + getAcceptor().shutdown(); } Broker::~Broker() { shutdown(); } +int16_t Broker::getPort() const { return getAcceptor().getPort(); } + +Acceptor& Broker::getAcceptor() const { + if (!acceptor) + const_cast<Acceptor::shared_ptr&>(acceptor) = + Acceptor::create(config.getPort(), + config.getConnectionBacklog(), + config.getWorkerThreads(), + config.isTrace()); + return *acceptor; +} + + const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index ef5e7bbafb..27d2fec006 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -42,8 +42,8 @@ namespace broker { /** * A broker instance. */ -class Broker : public qpid::sys::Runnable, - public qpid::SharedObject<Broker> +class Broker : public sys::Runnable, + public SharedObject<Broker> { public: static const int16_t DEFAULT_PORT; @@ -67,7 +67,7 @@ class Broker : public qpid::sys::Runnable, * port, which will be different if the configured port is * 0. */ - virtual int16_t getPort() const { return acceptor->getPort(); } + virtual int16_t getPort() const; /** * Run the broker. Implements Runnable::run() so the broker @@ -86,9 +86,11 @@ class Broker : public qpid::sys::Runnable, AutoDelete& getCleaner() { return cleaner; } private: - Broker(const Configuration& config); + Broker(const Configuration& config); + sys::Acceptor& getAcceptor() const; - qpid::sys::Acceptor::shared_ptr acceptor; + Configuration config; + sys::Acceptor::shared_ptr acceptor; std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index abf0b3852d..73ece8b264 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -183,14 +183,18 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk( const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ - connection.client->getConnection().tune(context, 100, connection.framemax, connection.heartbeat); + connection.client->getConnection().tune( + context, 100, connection.getFrameMax(), connection.getHeartbeat()); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){} -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ - connection.framemax = framemax; - connection.heartbeat = heartbeat; +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk( + const MethodContext&, u_int16_t /*channelmax*/, + u_int32_t framemax, u_int16_t heartbeat) +{ + connection.setFrameMax(framemax); + connection.setHeartbeat(heartbeat); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ @@ -496,14 +500,14 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( } BrokerAdapter::BrokerAdapter( - Channel* ch, Connection& c, Broker& b + std::auto_ptr<Channel> ch, Connection& c, Broker& b ) : channel(ch), connection(c), broker(b), - serverOps(new ServerOps(*ch,c,b)) + serverOps(new ServerOps(*channel,c,b)) { - init(ch->getId(), c.getOutput(), ch->getVersion()); + init(channel->getId(), c.getOutput(), channel->getVersion()); } void BrokerAdapter::handleMethodInContext( diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h index 339a7f6b48..b797576b39 100644 --- a/cpp/lib/broker/BrokerAdapter.h +++ b/cpp/lib/broker/BrokerAdapter.h @@ -18,6 +18,7 @@ * limitations under the License. * */ +#include <memory.h> #include "AMQP_ServerOperations.h" #include "BodyHandler.h" @@ -31,8 +32,6 @@ class AMQMethodBody; class Connection; class Broker; -// FIXME aconway 2007-01-17: Rename to ChannelAdapter. - /** * Per-channel protocol adapter. * @@ -44,8 +43,7 @@ class Broker; class BrokerAdapter : public qpid::framing::ChannelAdapter { public: - // FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel> - BrokerAdapter(Channel* ch, Connection&, Broker&); + BrokerAdapter(std::auto_ptr<Channel> ch, Connection&, Broker&); Channel& getChannel() { return *channel; } void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index 0f58278a5a..5fcae39865 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -32,11 +32,11 @@ namespace qpid { namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : - framemax(65536), - heartbeat(0), broker(broker_), settings(broker.getTimeout(), broker.getStagingThreshold()), - out(out_) + out(out_), + framemax(65536), + heartbeat(0) {} Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ @@ -106,11 +106,12 @@ void Connection::closeChannel(u_int16_t channel) { BrokerAdapter& Connection::getAdapter(u_int16_t id) { AdapterMap::iterator i = adapters.find(id); if (i == adapters.end()) { - Channel* ch=new Channel( - client->getProtocolVersion(), out, id, - framemax, broker.getQueues().getStore(), - settings.stagingThreshold); - BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker); + std::auto_ptr<Channel> ch( + new Channel( + client->getProtocolVersion(), out, id, + framemax, broker.getQueues().getStore(), + settings.stagingThreshold)); + BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker); adapters.insert(id, adapter); return *adapter; } diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index d5d90d4830..08d14d795a 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -61,9 +61,12 @@ class Connection : public qpid::sys::ConnectionInputHandler, qpid::sys::ConnectionOutputHandler& getOutput() { return *out; } - // FIXME aconway 2007-01-16: encapsulate. - u_int32_t framemax; - u_int16_t heartbeat; + u_int32_t getFrameMax() const { return framemax; } + u_int16_t getHeartbeat() const { return heartbeat; } + + void setFrameMax(u_int32_t fm) { framemax = fm; } + void setHeartbeat(u_int16_t hb) { heartbeat = hb; } + Broker& broker; std::auto_ptr<qpid::framing::AMQP_ClientProxy> client; Settings settings; @@ -94,7 +97,8 @@ class Connection : public qpid::sys::ConnectionInputHandler, AdapterMap adapters; qpid::sys::ConnectionOutputHandler* out; - + u_int32_t framemax; + u_int16_t heartbeat; }; }} diff --git a/cpp/tests/InProcessBroker.h b/cpp/tests/InProcessBroker.h index af0f4e84fe..cf2b9df8b0 100644 --- a/cpp/tests/InProcessBroker.h +++ b/cpp/tests/InProcessBroker.h @@ -49,16 +49,7 @@ framing::AMQFrame copy(framing::AMQFrame& from) { * * Also allows you to "snoop" on frames exchanged between client & broker. * - * Use as follows: - * - \code - broker::InProcessBroker ibroker(version); - client::Connection clientConnection; - clientConnection.setConnector(ibroker); - clientConnection.open(""); - ... use as normal - \endcode - * + * see FramingTest::testRequestResponseRoundtrip() for example of use. */ class InProcessBroker : public client::Connector { public: |