diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-09 10:07:26 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-09 10:07:26 +0000 |
commit | c4bf499790c30e0c98dd560c50c64c8a27fd9b89 (patch) | |
tree | c1f439bb86e32027c1aea5ec4e78f291737e8230 /cpp/src | |
parent | 32fe78d370e0572a5ed21ff3e84f668d8a2f2a49 (diff) | |
download | qpid-python-c4bf499790c30e0c98dd560c50c64c8a27fd9b89.tar.gz |
refactoring:
* separated out the connection level method handling from semantic level (session/channel level should also be separated)
* reduce coupling between Connection and Channel
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@554590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 31 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionAdapter.cpp | 124 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionAdapter.h | 104 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HandlerImpl.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 13 |
12 files changed, 296 insertions, 143 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 28216649cc..bbb1c6655e 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -193,6 +193,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/BrokerMessageMessage.cpp \ qpid/broker/BrokerQueue.cpp \ qpid/broker/Connection.cpp \ + qpid/broker/ConnectionAdapter.cpp \ qpid/broker/ConnectionFactory.cpp \ qpid/broker/Daemon.cpp \ qpid/broker/DeliverableMessage.cpp \ @@ -289,6 +290,7 @@ nobase_include_HEADERS = \ qpid/broker/BrokerMessageMessage.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Connection.h \ + qpid/broker/ConnectionAdapter.h \ qpid/broker/ConnectionFactory.h \ qpid/broker/ConnectionToken.h \ qpid/broker/Content.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index dc8cd6cce1..bbf6686a6c 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -38,7 +38,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : connection(c), basicHandler(*this), channelHandler(*this), - connectionHandler(*this), exchangeHandler(*this), bindingHandler(*this), messageHandler(*this), @@ -51,47 +50,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : ProtocolVersion BrokerAdapter::getVersion() const { return connection.getVersion(); } - -void BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext&, const FieldTable& /*clientProperties*/, - const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/) -{ - client.tune( - CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); -} - -void BrokerAdapter::ConnectionHandlerImpl::secureOk( - const MethodContext&, const string& /*response*/){} - -void BrokerAdapter::ConnectionHandlerImpl::tuneOk( - const MethodContext&, uint16_t /*channelmax*/, - uint32_t framemax, uint16_t heartbeat) -{ - connection.setFrameMax(framemax); - connection.setHeartbeat(heartbeat); -} - -void BrokerAdapter::ConnectionHandlerImpl::open( - const MethodContext& context, const string& /*virtualHost*/, - const string& /*capabilities*/, bool /*insist*/) -{ - string knownhosts; - client.openOk( - knownhosts, context.getRequestId()); -} - -void BrokerAdapter::ConnectionHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(context.getRequestId()); - connection.getOutput().close(); -} - -void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ - connection.getOutput().close(); -} void BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ @@ -208,7 +166,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = connection.getQueue(name, channel.getId()); + queue = getQueue(name); } else { std::pair<Queue::shared_ptr, bool> queue_created = broker.getQueues().declare( @@ -249,7 +207,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_ const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -275,7 +233,7 @@ BrokerAdapter::QueueHandlerImpl::unbind( const string& routingKey, const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); @@ -290,7 +248,7 @@ BrokerAdapter::QueueHandlerImpl::unbind( void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); int count = queue->purge(); if(!nowait) client.purgeOk( count, context.getRequestId()); } @@ -299,7 +257,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); - Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); + Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw ChannelException(406, "Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -337,7 +295,7 @@ void BrokerAdapter::BasicHandlerImpl::consume( bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); if(!consumerTag.empty() && channel.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } @@ -377,8 +335,8 @@ void BrokerAdapter::BasicHandlerImpl::publish( } void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ + Queue::shared_ptr queue = getQueue(queueName); + if(!channel.get(queue, "", !noAck)){ string clusterId;//not used, part of an imatix hack client.getEmpty(clusterId, context.getRequestId()); diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 01ece30cfa..c66bdb3a31 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -60,7 +60,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations framing::ProtocolVersion getVersion() const; ChannelHandler* getChannelHandler() { return &channelHandler; } - ConnectionHandler* getConnectionHandler() { return &connectionHandler; } BasicHandler* getBasicHandler() { return &basicHandler; } ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } BindingHandler* getBindingHandler() { return &bindingHandler; } @@ -81,35 +80,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } + ConnectionHandler* getConnectionHandler() { + throw ConnectionException(503, "Can't access connection class on non-zero channel!"); + } + framing::AMQP_ClientProxy& getProxy() { return proxy; } private: - class ConnectionHandlerImpl : - public ConnectionHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Connection> - { - public: - ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - - void startOk(const framing::MethodContext& context, - const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(const framing::MethodContext& context, - const std::string& response); - void tuneOk(const framing::MethodContext& context, - uint16_t channelMax, - uint32_t frameMax, uint16_t heartbeat); - void open(const framing::MethodContext& context, - const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(const framing::MethodContext& context, uint16_t replyCode, - const std::string& replyText, - uint16_t classId, uint16_t methodId); - void closeOk(const framing::MethodContext& context); - }; - class ChannelHandlerImpl : public ChannelHandler, public HandlerImpl<framing::AMQP_ClientProxy::Channel> @@ -231,7 +209,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations Connection& connection; BasicHandlerImpl basicHandler; ChannelHandlerImpl channelHandler; - ConnectionHandlerImpl connectionHandler; ExchangeHandlerImpl exchangeHandler; BindingHandlerImpl bindingHandler; MessageHandlerImpl messageHandler; diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 3d9eab4433..c81e73aba1 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -50,22 +50,17 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel( - Connection& con, ChannelId id, - uint32_t _framesize, MessageStore* const _store, - uint64_t _stagingThreshold -) : +Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : ChannelAdapter(), connection(con), currentDeliveryTag(1), prefetchSize(0), prefetchCount(0), - framesize(_framesize), tagGenerator("sgen"), dtxSelected(false), accumulatedAck(0), store(_store), - messageBuilder(this, _store, _stagingThreshold), + messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened flowActive(true), adapter(new BrokerAdapter(*this, con, con.broker)) @@ -215,7 +210,7 @@ void Channel::deliver( outstanding.count++; } //send deliver method, header and content(s) - msg->deliver(*this, consumerTag, deliveryTag, framesize); + msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -378,7 +373,7 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx msg->sendGetOk(MethodContext(this, msg->getRespondTo()), destination, queue->getMessageCount() + 1, myDeliveryTag, - framesize); + connection.getFrameMax()); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -391,7 +386,7 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag) { - msg->deliver(*this, consumerTag, deliveryTag, framesize); + msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); } void Channel::handleMethodInContext( diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 0529caed5f..9212e8f632 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -86,7 +86,6 @@ class Channel : public framing::ChannelAdapter, uint32_t prefetchSize; uint16_t prefetchCount; Prefetch outstanding; - uint32_t framesize; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; sys::Mutex deliveryLock; @@ -110,12 +109,7 @@ class Channel : public framing::ChannelAdapter, void checkDtxTimeout(); public: - Channel(Connection& parent, - framing::ChannelId id, - uint32_t framesize, - MessageStore* const _store = 0, - uint64_t stagingThreshold = 0); - + Channel(Connection& parent, framing::ChannelId id, MessageStore* const store = 0); ~Channel(); bool isOpen() const { return opened; } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ce9e4865db..cdbcee1c69 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -41,51 +41,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : framemax(65536), heartbeat(0), client(0), - stagingThreshold(broker.getStagingThreshold()) + stagingThreshold(broker.getStagingThreshold()), + adapter(*this) {} -Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){ - Queue::shared_ptr queue; - if (name.empty()) { - queue = getChannel(channel).getDefaultQueue(); - if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); - } else { - queue = broker.getQueues().find(name); - if (queue == 0) { - throw ChannelException( 404, "Queue not found: " + name); - } - } - return queue; -} - - Exchange::shared_ptr Connection::findExchange(const string& name){ return broker.getExchanges().get(name); } void Connection::received(framing::AMQFrame& frame){ - getChannel((frame.getChannel())).getHandlers().in->handle(frame); + if (frame.getChannel() == 0) { + adapter.handle(frame); + } else { + getChannel((frame.getChannel())).getHandlers().in->handle(frame); + } } void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { - client->close(code, text, classId, methodId); + adapter.close(code, text, classId, methodId); getOutput().close(); } void Connection::initiated(const framing::ProtocolInitiation& header) { version = ProtocolVersion(header.getMajor(), header.getMinor()); - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); - getChannel(0).init(0, *out, getVersion()); - client = &getChannel(0).getAdapter().getProxy().getConnection(); - client->start( - header.getMajor(), header.getMinor(), - properties, mechanisms, locales); + adapter.init(header); } void Connection::idleOut(){} @@ -117,10 +100,7 @@ void Connection::closeChannel(uint16_t id) { Channel& Connection::getChannel(ChannelId id) { ChannelMap::iterator i = channels.find(id); if (i == channels.end()) { - i = channels.insert( - id, new Channel( - *this, id, framemax, broker.getQueues().getStore(), - broker.getStagingThreshold())).first; + i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first; } return *i; } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 259a74f808..a885ac4065 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -29,6 +29,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/framing/ChannelAdapter.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -36,6 +37,7 @@ #include "Broker.h" #include "qpid/Exception.h" #include "BrokerChannel.h" +#include "ConnectionAdapter.h" namespace qpid { namespace broker { @@ -66,14 +68,8 @@ class Connection : public sys::ConnectionInputHandler, void setFrameMax(uint32_t fm) { framemax = fm; } void setHeartbeat(uint16_t hb) { heartbeat = hb; } + void setStagingThreshold(uint64_t st) { stagingThreshold = st; } - /** - * Get named queue, never returns 0. - * @return: named queue or default queue for channel if name="" - * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if name="" and channel has no default. - */ - Queue::shared_ptr getQueue(const string& name, uint16_t channel); Broker& broker; std::vector<Queue::shared_ptr> exclusiveQueues; @@ -97,7 +93,8 @@ class Connection : public sys::ConnectionInputHandler, uint32_t framemax; uint16_t heartbeat; framing::AMQP_ClientProxy::Connection* client; - const uint64_t stagingThreshold; + uint64_t stagingThreshold; + ConnectionAdapter adapter; }; diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp new file mode 100644 index 0000000000..8a4450c881 --- /dev/null +++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionAdapter.h" +#include "Connection.h" +#include "qpid/framing/MethodContext.h" + +using namespace qpid; +using namespace qpid::broker; +using qpid::framing::ReplyCode; +using qpid::framing::ClassId; +using qpid::framing::MethodId; +using qpid::framing::MethodContext; +using qpid::framing::FieldTable; + +void ConnectionAdapter::init(const framing::ProtocolInitiation& header) { + ChannelAdapter::init(0, handler->connection.getOutput(), handler->connection.getVersion()); + FieldTable properties; + string mechanisms("PLAIN"); + string locales("en_US"); + handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); +} + +void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) +{ + handler->client.close(code, text, classId, methodId); +} + +void ConnectionAdapter::handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context +) +{ + try{ + method->invoke(*this, context); + }catch(ConnectionException& e){ + handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + } +} + +framing::AMQP_ServerOperations::ConnectionHandler* ConnectionAdapter::getConnectionHandler() +{ + return handler.get(); +} + +framing::ProtocolVersion ConnectionAdapter::getVersion() const +{ + return handler->connection.getVersion(); +} + +void ConnectionAdapter::handle(framing::AMQFrame& frame) +{ + getHandlers().in->handle(frame); +} + +ConnectionAdapter::ConnectionAdapter(Connection& connection) +{ + handler = std::auto_ptr<Handler>(new Handler(connection, *this)); +} + +Handler::Handler(Connection& c, ConnectionAdapter& a) : + proxy(a), client(proxy.getConnection()), connection(c) {} + + +void Handler::startOk( + const MethodContext&, const FieldTable& /*clientProperties*/, + const string& /*mechanism*/, + const string& /*response*/, const string& /*locale*/) +{ + client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); +} + +void Handler::secureOk( + const MethodContext&, const string& /*response*/){} + +void Handler::tuneOk( + const MethodContext&, uint16_t /*channelmax*/, + uint32_t framemax, uint16_t heartbeat) +{ + connection.setFrameMax(framemax); + connection.setHeartbeat(heartbeat); +} + +void Handler::open( + const MethodContext& context, const string& /*virtualHost*/, + const string& /*capabilities*/, bool /*insist*/) +{ + string knownhosts; + client.openOk( + knownhosts, context.getRequestId()); +} + + +void Handler::close( + const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/, + uint16_t /*classId*/, uint16_t /*methodId*/) +{ + client.closeOk(context.getRequestId()); + connection.getOutput().close(); +} + +void Handler::closeOk(const MethodContext&){ + connection.getOutput().close(); +} diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h new file mode 100644 index 0000000000..383fbf84c0 --- /dev/null +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionAdapter_ +#define _ConnectionAdapter_ + +#include <memory> +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ClientProxy.h" +#include "qpid/framing/ChannelAdapter.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/Exception.h" + +namespace qpid { +namespace broker { + +class Connection; +struct Handler; + +class ConnectionAdapter : public framing::ChannelAdapter, public framing::AMQP_ServerOperations +{ + std::auto_ptr<Handler> handler; +public: + ConnectionAdapter(Connection& connection); + void init(const framing::ProtocolInitiation& header); + void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); + void handle(framing::AMQFrame& frame); + + //ChannelAdapter virtual methods: + void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context); + bool isOpen() const { return true; } //channel 0 is always open + //never needed: + void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {} + void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>) {} + void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>) {} + + //AMQP_ServerOperations: + ConnectionHandler* getConnectionHandler(); + ChannelHandler* getChannelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + BasicHandler* getBasicHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + ExchangeHandler* getExchangeHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + BindingHandler* getBindingHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + QueueHandler* getQueueHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + TxHandler* getTxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + MessageHandler* getMessageHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + FileHandler* getFileHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + DtxHandler* getDtxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + framing::ProtocolVersion getVersion() const; +}; + +struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler +{ + framing::AMQP_ClientProxy proxy; + framing::AMQP_ClientProxy::Connection client; + Connection& connection; + + Handler(Connection& connection, ConnectionAdapter& adapter); + void startOk(const framing::MethodContext& context, + const qpid::framing::FieldTable& clientProperties, + const std::string& mechanism, const std::string& response, + const std::string& locale); + void secureOk(const framing::MethodContext& context, + const std::string& response); + void tuneOk(const framing::MethodContext& context, + uint16_t channelMax, + uint32_t frameMax, uint16_t heartbeat); + void open(const framing::MethodContext& context, + const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(const framing::MethodContext& context, uint16_t replyCode, + const std::string& replyText, + uint16_t classId, uint16_t methodId); + void closeOk(const framing::MethodContext& context); +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index 338ebca4b7..008be10867 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -19,6 +19,7 @@ * */ +#include "Broker.h" #include "BrokerChannel.h" #include "qpid/framing/AMQP_ClientProxy.h" @@ -30,8 +31,7 @@ class AMQP_ClientProxy; namespace broker { -class Broker; -class Channel; + //class Channel; class Connection; /** @@ -47,6 +47,28 @@ struct CoreRefs Connection& connection; Broker& broker; framing::AMQP_ClientProxy proxy; + + /** + * Get named queue, never returns 0. + * @return: named queue or default queue for channel if name="" + * @exception: ChannelException if no queue of that name is found. + * @exception: ConnectionException if name="" and channel has no default. + */ + Queue::shared_ptr getQueue(const string& name) { + //Note: this can be removed soon as the default queue for channels is scrapped in 0-10 + Queue::shared_ptr queue; + if (name.empty()) { + queue = channel.getDefaultQueue(); + if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); + } else { + queue = broker.getQueues().find(name); + if (queue == 0) { + throw ChannelException( 404, "Queue not found: " + name); + } + } + return queue; + } + }; diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 22011169a2..bbfcf209ad 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -123,7 +123,7 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const framing::FieldTable& filter ) { - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; @@ -142,8 +142,7 @@ MessageHandlerImpl::get( const MethodContext& context, const string& destination, bool noAck ) { - Queue::shared_ptr queue = - connection.getQueue(queueName, context.channel->getId()); + Queue::shared_ptr queue = getQueue(queueName); if(channel.get(queue, destination, !noAck)) client.ok(context.getRequestId()); diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 29ed1ae230..929105f6e3 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -154,7 +154,7 @@ class BrokerChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, 0, 0, 0); + Channel channel(connection, 0, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); @@ -179,7 +179,7 @@ class BrokerChannelTest : public CppUnit::TestCase } void testDeliveryNoAck(){ - Channel channel(connection, 7, 10000); + Channel channel(connection, 7); channel.open(); const string data("abcdefghijklmn"); Message::shared_ptr msg( @@ -209,7 +209,7 @@ class BrokerChannelTest : public CppUnit::TestCase } void testDeliveryAndRecovery(){ - Channel channel(connection, 7, 10000); + Channel channel(connection, 7); channel.open(); const string data("abcdefghijklmn"); @@ -241,8 +241,9 @@ class BrokerChannelTest : public CppUnit::TestCase void testStaging(){ MockMessageStore store; - Channel channel( - connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); + connection.setFrameMax(1000); + connection.setStagingThreshold(10); + Channel channel(connection, 1, &store); const string data[] = {"abcde", "fghij", "klmno"}; Message* msg = new BasicMessage( @@ -335,7 +336,7 @@ class BrokerChannelTest : public CppUnit::TestCase } void testFlow(){ - Channel channel(connection, 7, 10000); + Channel channel(connection, 7); channel.open(); //there will always be a connection-start frame CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); |