summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-09 10:07:26 +0000
committerGordon Sim <gsim@apache.org>2007-07-09 10:07:26 +0000
commitc4bf499790c30e0c98dd560c50c64c8a27fd9b89 (patch)
treec1f439bb86e32027c1aea5ec4e78f291737e8230 /cpp/src
parent32fe78d370e0572a5ed21ff3e84f668d8a2f2a49 (diff)
downloadqpid-python-c4bf499790c30e0c98dd560c50c64c8a27fd9b89.tar.gz
refactoring:
* separated out the connection level method handling from semantic level (session/channel level should also be separated) * reduce coupling between Connection and Channel git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@554590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp58
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h31
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp15
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h8
-rw-r--r--cpp/src/qpid/broker/Connection.cpp40
-rw-r--r--cpp/src/qpid/broker/Connection.h13
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp124
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h104
-rw-r--r--cpp/src/qpid/broker/HandlerImpl.h26
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp5
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp13
12 files changed, 296 insertions, 143 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 28216649cc..bbb1c6655e 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -193,6 +193,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/BrokerMessageMessage.cpp \
qpid/broker/BrokerQueue.cpp \
qpid/broker/Connection.cpp \
+ qpid/broker/ConnectionAdapter.cpp \
qpid/broker/ConnectionFactory.cpp \
qpid/broker/Daemon.cpp \
qpid/broker/DeliverableMessage.cpp \
@@ -289,6 +290,7 @@ nobase_include_HEADERS = \
qpid/broker/BrokerMessageMessage.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Connection.h \
+ qpid/broker/ConnectionAdapter.h \
qpid/broker/ConnectionFactory.h \
qpid/broker/ConnectionToken.h \
qpid/broker/Content.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index dc8cd6cce1..bbf6686a6c 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -38,7 +38,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
connection(c),
basicHandler(*this),
channelHandler(*this),
- connectionHandler(*this),
exchangeHandler(*this),
bindingHandler(*this),
messageHandler(*this),
@@ -51,47 +50,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
ProtocolVersion BrokerAdapter::getVersion() const {
return connection.getVersion();
}
-
-void BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext&, const FieldTable& /*clientProperties*/,
- const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/)
-{
- client.tune(
- CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::secureOk(
- const MethodContext&, const string& /*response*/){}
-
-void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
- const MethodContext&, uint16_t /*channelmax*/,
- uint32_t framemax, uint16_t heartbeat)
-{
- connection.setFrameMax(framemax);
- connection.setHeartbeat(heartbeat);
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::open(
- const MethodContext& context, const string& /*virtualHost*/,
- const string& /*capabilities*/, bool /*insist*/)
-{
- string knownhosts;
- client.openOk(
- knownhosts, context.getRequestId());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk(context.getRequestId());
- connection.getOutput().close();
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
- connection.getOutput().close();
-}
void BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
@@ -208,7 +166,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = connection.getQueue(name, channel.getId());
+ queue = getQueue(name);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(
@@ -249,7 +207,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -275,7 +233,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(
const string& routingKey,
const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
@@ -290,7 +248,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(
void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
int count = queue->purge();
if(!nowait) client.purgeOk( count, context.getRequestId());
}
@@ -299,7 +257,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
- Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
+ Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw ChannelException(406, "Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -337,7 +295,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
if(!consumerTag.empty() && channel.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
@@ -377,8 +335,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(
}
void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
+ Queue::shared_ptr queue = getQueue(queueName);
+ if(!channel.get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId, context.getRequestId());
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 01ece30cfa..c66bdb3a31 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -60,7 +60,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
framing::ProtocolVersion getVersion() const;
ChannelHandler* getChannelHandler() { return &channelHandler; }
- ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
BasicHandler* getBasicHandler() { return &basicHandler; }
ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
BindingHandler* getBindingHandler() { return &bindingHandler; }
@@ -81,35 +80,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+ ConnectionHandler* getConnectionHandler() {
+ throw ConnectionException(503, "Can't access connection class on non-zero channel!");
+ }
+
framing::AMQP_ClientProxy& getProxy() { return proxy; }
private:
- class ConnectionHandlerImpl :
- public ConnectionHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Connection>
- {
- public:
- ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void startOk(const framing::MethodContext& context,
- const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const framing::MethodContext& context,
- const std::string& response);
- void tuneOk(const framing::MethodContext& context,
- uint16_t channelMax,
- uint32_t frameMax, uint16_t heartbeat);
- void open(const framing::MethodContext& context,
- const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(const framing::MethodContext& context, uint16_t replyCode,
- const std::string& replyText,
- uint16_t classId, uint16_t methodId);
- void closeOk(const framing::MethodContext& context);
- };
-
class ChannelHandlerImpl :
public ChannelHandler,
public HandlerImpl<framing::AMQP_ClientProxy::Channel>
@@ -231,7 +209,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
Connection& connection;
BasicHandlerImpl basicHandler;
ChannelHandlerImpl channelHandler;
- ConnectionHandlerImpl connectionHandler;
ExchangeHandlerImpl exchangeHandler;
BindingHandlerImpl bindingHandler;
MessageHandlerImpl messageHandler;
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 3d9eab4433..c81e73aba1 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -50,22 +50,17 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(
- Connection& con, ChannelId id,
- uint32_t _framesize, MessageStore* const _store,
- uint64_t _stagingThreshold
-) :
+Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
ChannelAdapter(),
connection(con),
currentDeliveryTag(1),
prefetchSize(0),
prefetchCount(0),
- framesize(_framesize),
tagGenerator("sgen"),
dtxSelected(false),
accumulatedAck(0),
store(_store),
- messageBuilder(this, _store, _stagingThreshold),
+ messageBuilder(this, _store, connection.getStagingThreshold()),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
flowActive(true),
adapter(new BrokerAdapter(*this, con, con.broker))
@@ -215,7 +210,7 @@ void Channel::deliver(
outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(*this, consumerTag, deliveryTag, framesize);
+ msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -378,7 +373,7 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx
msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
destination,
queue->getMessageCount() + 1, myDeliveryTag,
- framesize);
+ connection.getFrameMax());
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -391,7 +386,7 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
uint64_t deliveryTag)
{
- msg->deliver(*this, consumerTag, deliveryTag, framesize);
+ msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
}
void Channel::handleMethodInContext(
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 0529caed5f..9212e8f632 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -86,7 +86,6 @@ class Channel : public framing::ChannelAdapter,
uint32_t prefetchSize;
uint16_t prefetchCount;
Prefetch outstanding;
- uint32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
sys::Mutex deliveryLock;
@@ -110,12 +109,7 @@ class Channel : public framing::ChannelAdapter,
void checkDtxTimeout();
public:
- Channel(Connection& parent,
- framing::ChannelId id,
- uint32_t framesize,
- MessageStore* const _store = 0,
- uint64_t stagingThreshold = 0);
-
+ Channel(Connection& parent, framing::ChannelId id, MessageStore* const store = 0);
~Channel();
bool isOpen() const { return opened; }
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ce9e4865db..cdbcee1c69 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -41,51 +41,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
framemax(65536),
heartbeat(0),
client(0),
- stagingThreshold(broker.getStagingThreshold())
+ stagingThreshold(broker.getStagingThreshold()),
+ adapter(*this)
{}
-Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){
- Queue::shared_ptr queue;
- if (name.empty()) {
- queue = getChannel(channel).getDefaultQueue();
- if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
- } else {
- queue = broker.getQueues().find(name);
- if (queue == 0) {
- throw ChannelException( 404, "Queue not found: " + name);
- }
- }
- return queue;
-}
-
-
Exchange::shared_ptr Connection::findExchange(const string& name){
return broker.getExchanges().get(name);
}
void Connection::received(framing::AMQFrame& frame){
- getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+ if (frame.getChannel() == 0) {
+ adapter.handle(frame);
+ } else {
+ getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+ }
}
void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
- client->close(code, text, classId, methodId);
+ adapter.close(code, text, classId, methodId);
getOutput().close();
}
void Connection::initiated(const framing::ProtocolInitiation& header) {
version = ProtocolVersion(header.getMajor(), header.getMinor());
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US");
- getChannel(0).init(0, *out, getVersion());
- client = &getChannel(0).getAdapter().getProxy().getConnection();
- client->start(
- header.getMajor(), header.getMinor(),
- properties, mechanisms, locales);
+ adapter.init(header);
}
void Connection::idleOut(){}
@@ -117,10 +100,7 @@ void Connection::closeChannel(uint16_t id) {
Channel& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i = channels.find(id);
if (i == channels.end()) {
- i = channels.insert(
- id, new Channel(
- *this, id, framemax, broker.getQueues().getStore(),
- broker.getStagingThreshold())).first;
+ i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first;
}
return *i;
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 259a74f808..a885ac4065 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -29,6 +29,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/ChannelAdapter.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -36,6 +37,7 @@
#include "Broker.h"
#include "qpid/Exception.h"
#include "BrokerChannel.h"
+#include "ConnectionAdapter.h"
namespace qpid {
namespace broker {
@@ -66,14 +68,8 @@ class Connection : public sys::ConnectionInputHandler,
void setFrameMax(uint32_t fm) { framemax = fm; }
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+ void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
- /**
- * Get named queue, never returns 0.
- * @return: named queue or default queue for channel if name=""
- * @exception: ChannelException if no queue of that name is found.
- * @exception: ConnectionException if name="" and channel has no default.
- */
- Queue::shared_ptr getQueue(const string& name, uint16_t channel);
Broker& broker;
std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -97,7 +93,8 @@ class Connection : public sys::ConnectionInputHandler,
uint32_t framemax;
uint16_t heartbeat;
framing::AMQP_ClientProxy::Connection* client;
- const uint64_t stagingThreshold;
+ uint64_t stagingThreshold;
+ ConnectionAdapter adapter;
};
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
new file mode 100644
index 0000000000..8a4450c881
--- /dev/null
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionAdapter.h"
+#include "Connection.h"
+#include "qpid/framing/MethodContext.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using qpid::framing::ReplyCode;
+using qpid::framing::ClassId;
+using qpid::framing::MethodId;
+using qpid::framing::MethodContext;
+using qpid::framing::FieldTable;
+
+void ConnectionAdapter::init(const framing::ProtocolInitiation& header) {
+ ChannelAdapter::init(0, handler->connection.getOutput(), handler->connection.getVersion());
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US");
+ handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
+}
+
+void ConnectionAdapter::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ handler->client.close(code, text, classId, methodId);
+}
+
+void ConnectionAdapter::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ method->invoke(*this, context);
+ }catch(ConnectionException& e){
+ handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+framing::AMQP_ServerOperations::ConnectionHandler* ConnectionAdapter::getConnectionHandler()
+{
+ return handler.get();
+}
+
+framing::ProtocolVersion ConnectionAdapter::getVersion() const
+{
+ return handler->connection.getVersion();
+}
+
+void ConnectionAdapter::handle(framing::AMQFrame& frame)
+{
+ getHandlers().in->handle(frame);
+}
+
+ConnectionAdapter::ConnectionAdapter(Connection& connection)
+{
+ handler = std::auto_ptr<Handler>(new Handler(connection, *this));
+}
+
+Handler::Handler(Connection& c, ConnectionAdapter& a) :
+ proxy(a), client(proxy.getConnection()), connection(c) {}
+
+
+void Handler::startOk(
+ const MethodContext&, const FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/)
+{
+ client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
+}
+
+void Handler::secureOk(
+ const MethodContext&, const string& /*response*/){}
+
+void Handler::tuneOk(
+ const MethodContext&, uint16_t /*channelmax*/,
+ uint32_t framemax, uint16_t heartbeat)
+{
+ connection.setFrameMax(framemax);
+ connection.setHeartbeat(heartbeat);
+}
+
+void Handler::open(
+ const MethodContext& context, const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
+ string knownhosts;
+ client.openOk(
+ knownhosts, context.getRequestId());
+}
+
+
+void Handler::close(
+ const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
+ uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+ client.closeOk(context.getRequestId());
+ connection.getOutput().close();
+}
+
+void Handler::closeOk(const MethodContext&){
+ connection.getOutput().close();
+}
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
new file mode 100644
index 0000000000..383fbf84c0
--- /dev/null
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionAdapter_
+#define _ConnectionAdapter_
+
+#include <memory>
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+class Connection;
+struct Handler;
+
+class ConnectionAdapter : public framing::ChannelAdapter, public framing::AMQP_ServerOperations
+{
+ std::auto_ptr<Handler> handler;
+public:
+ ConnectionAdapter(Connection& connection);
+ void init(const framing::ProtocolInitiation& header);
+ void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
+ void handle(framing::AMQFrame& frame);
+
+ //ChannelAdapter virtual methods:
+ void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context);
+ bool isOpen() const { return true; } //channel 0 is always open
+ //never needed:
+ void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>) {}
+ void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>) {}
+ void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>) {}
+
+ //AMQP_ServerOperations:
+ ConnectionHandler* getConnectionHandler();
+ ChannelHandler* getChannelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ BasicHandler* getBasicHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ ExchangeHandler* getExchangeHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ BindingHandler* getBindingHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ QueueHandler* getQueueHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ TxHandler* getTxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ MessageHandler* getMessageHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ FileHandler* getFileHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ DtxHandler* getDtxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ framing::ProtocolVersion getVersion() const;
+};
+
+struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+{
+ framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Connection client;
+ Connection& connection;
+
+ Handler(Connection& connection, ConnectionAdapter& adapter);
+ void startOk(const framing::MethodContext& context,
+ const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const framing::MethodContext& context,
+ const std::string& response);
+ void tuneOk(const framing::MethodContext& context,
+ uint16_t channelMax,
+ uint32_t frameMax, uint16_t heartbeat);
+ void open(const framing::MethodContext& context,
+ const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(const framing::MethodContext& context, uint16_t replyCode,
+ const std::string& replyText,
+ uint16_t classId, uint16_t methodId);
+ void closeOk(const framing::MethodContext& context);
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h
index 338ebca4b7..008be10867 100644
--- a/cpp/src/qpid/broker/HandlerImpl.h
+++ b/cpp/src/qpid/broker/HandlerImpl.h
@@ -19,6 +19,7 @@
*
*/
+#include "Broker.h"
#include "BrokerChannel.h"
#include "qpid/framing/AMQP_ClientProxy.h"
@@ -30,8 +31,7 @@ class AMQP_ClientProxy;
namespace broker {
-class Broker;
-class Channel;
+ //class Channel;
class Connection;
/**
@@ -47,6 +47,28 @@ struct CoreRefs
Connection& connection;
Broker& broker;
framing::AMQP_ClientProxy proxy;
+
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for channel if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if name="" and channel has no default.
+ */
+ Queue::shared_ptr getQueue(const string& name) {
+ //Note: this can be removed soon as the default queue for channels is scrapped in 0-10
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = channel.getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = broker.getQueues().find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+ }
+
};
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 22011169a2..bbfcf209ad 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -123,7 +123,7 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
@@ -142,8 +142,7 @@ MessageHandlerImpl::get( const MethodContext& context,
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue =
- connection.getQueue(queueName, context.channel->getId());
+ Queue::shared_ptr queue = getQueue(queueName);
if(channel.get(queue, destination, !noAck))
client.ok(context.getRequestId());
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 29ed1ae230..929105f6e3 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -154,7 +154,7 @@ class BrokerChannelTest : public CppUnit::TestCase
void testConsumerMgmt(){
Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(connection, 0, 0, 0);
+ Channel channel(connection, 0, 0);
channel.open();
CPPUNIT_ASSERT(!channel.exists("my_consumer"));
@@ -179,7 +179,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testDeliveryNoAck(){
- Channel channel(connection, 7, 10000);
+ Channel channel(connection, 7);
channel.open();
const string data("abcdefghijklmn");
Message::shared_ptr msg(
@@ -209,7 +209,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testDeliveryAndRecovery(){
- Channel channel(connection, 7, 10000);
+ Channel channel(connection, 7);
channel.open();
const string data("abcdefghijklmn");
@@ -241,8 +241,9 @@ class BrokerChannelTest : public CppUnit::TestCase
void testStaging(){
MockMessageStore store;
- Channel channel(
- connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);
+ connection.setFrameMax(1000);
+ connection.setStagingThreshold(10);
+ Channel channel(connection, 1, &store);
const string data[] = {"abcde", "fghij", "klmno"};
Message* msg = new BasicMessage(
@@ -335,7 +336,7 @@ class BrokerChannelTest : public CppUnit::TestCase
}
void testFlow(){
- Channel channel(connection, 7, 10000);
+ Channel channel(connection, 7);
channel.open();
//there will always be a connection-start frame
CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());