summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/configure.ac1
-rw-r--r--cpp/lib/broker/Broker.cpp5
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp94
-rw-r--r--cpp/lib/broker/BrokerAdapter.h8
-rw-r--r--cpp/lib/broker/BrokerChannel.h5
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp1
-rw-r--r--cpp/lib/broker/Connection.cpp17
-rw-r--r--cpp/lib/broker/Connection.h30
-rw-r--r--cpp/lib/broker/ConnectionFactory.cpp4
-rw-r--r--cpp/lib/broker/ConnectionFactory.h2
-rw-r--r--cpp/lib/broker/InMemoryContent.cpp1
-rw-r--r--cpp/lib/broker/LazyLoadedContent.cpp1
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp18
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h36
-rw-r--r--cpp/lib/client/ClientChannel.cpp10
-rw-r--r--cpp/lib/client/ClientChannel.h30
-rw-r--r--cpp/lib/client/Connection.cpp10
-rw-r--r--cpp/lib/client/Connection.h34
-rw-r--r--cpp/lib/common/Makefile.am4
-rw-r--r--cpp/lib/common/framing/AMQFrame.cpp6
-rw-r--r--cpp/lib/common/framing/AMQFrame.h6
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.cpp4
-rw-r--r--cpp/lib/common/framing/AMQMethodBody.h7
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.cpp7
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.h5
-rw-r--r--cpp/lib/common/framing/BodyHandler.cpp19
-rw-r--r--cpp/lib/common/framing/BodyHandler.h24
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp70
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h90
-rw-r--r--cpp/lib/common/framing/MethodContext.h32
-rw-r--r--cpp/lib/common/framing/OutputHandler.h2
-rw-r--r--cpp/lib/common/sys/ConnectionInputHandlerFactory.h4
-rw-r--r--cpp/lib/common/sys/ConnectionOutputHandler.h (renamed from cpp/lib/common/sys/SessionContext.h)6
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.h4
-rw-r--r--cpp/lib/common/sys/posix/EventChannelAcceptor.cpp2
-rw-r--r--cpp/lib/common/sys/posix/EventChannelConnection.h8
-rw-r--r--cpp/lib/common/sys/posix/check.h2
-rw-r--r--cpp/tests/ChannelTest.cpp5
-rw-r--r--cpp/tests/InMemoryContentTest.cpp1
-rw-r--r--cpp/tests/LazyLoadedContentTest.cpp1
-rw-r--r--cpp/tests/MessageBuilderTest.cpp3
-rw-r--r--cpp/tests/MessageTest.cpp1
-rw-r--r--cpp/tests/MockConnectionInputHandler.h2
43 files changed, 422 insertions, 200 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac
index 058bce148f..7359ba34e6 100644
--- a/cpp/configure.ac
+++ b/cpp/configure.ac
@@ -66,6 +66,7 @@ if test "${enableval}" = yes; then
gl_COMPILER_FLAGS(-Wvolatile-register-var)
gl_COMPILER_FLAGS(-Winvalid-pch)
gl_COMPILER_FLAGS(-Wno-system-headers)
+ gl_COMPILER_FLAGS(-Woverloaded-virtual)
AC_SUBST([WARNING_CFLAGS], [$COMPILER_FLAGS])
AC_DEFINE([lint], 1, [Define to 1 if the compiler is checking for lint.])
COMPILER_FLAGS=
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index 079eb5fd73..b9e7990861 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -30,7 +30,6 @@
#include "NullMessageStore.h"
#include "ProtocolInitiation.h"
#include "Connection.h"
-#include "sys/SessionContext.h"
#include "sys/ConnectionInputHandler.h"
#include "sys/ConnectionInputHandlerFactory.h"
#include "sys/TimeoutHandler.h"
@@ -97,7 +96,9 @@ void Broker::shutdown() {
acceptor->shutdown();
}
-Broker::~Broker() { }
+Broker::~Broker() {
+ shutdown();
+}
const int16_t Broker::DEFAULT_PORT(5672);
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 0d34868710..fda7d15784 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -181,9 +181,9 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations
};
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
- const MethodContext& , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+ const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
- connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat);
+ connection.client->getConnection().tune(context, 100, connection.framemax, connection.heartbeat);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
@@ -193,40 +193,40 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext
connection.heartbeat = heartbeat;
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext&, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
- connection.client->getConnection().openOk(0, knownhosts);
+ connection.client->getConnection().openOk(context, knownhosts);
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
- const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
+ const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
- connection.client->getConnection().closeOk(0);
- connection.context->close();
+ connection.client->getConnection().closeOk(context);
+ connection.getOutput().close();
}
void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
- connection.context->close();
+ connection.getOutput().close();
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
- const MethodContext&, const string& /*outOfBand*/){
+ const MethodContext& context, const string& /*outOfBand*/){
// FIXME aconway 2007-01-17: Assertions on all channel methods,
assertChannelNonZero(channel.getId());
if (channel.isOpen())
throw ConnectionException(504, "Channel already open");
channel.open();
// FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
- connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */);
+ connection.client->getChannel().openOk(context, std::string()/* ID */);
}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/,
+void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){
- connection.client->getChannel().closeOk(channel.getId());
+ connection.client->getChannel().closeOk(context);
// FIXME aconway 2007-01-18: Following line destroys this. Ugly.
connection.closeChannel(channel.getId());
}
@@ -235,7 +235,7 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&)
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& /*arguments*/){
@@ -258,19 +258,19 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&
}
}
if(!nowait){
- connection.client->getExchange().declareOk(channel.getId());
+ connection.client->getExchange().declareOk(context);
}
}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/,
+void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
- if(!nowait) connection.client->getExchange().deleteOk(channel.getId());
+ if(!nowait) connection.client->getExchange().deleteOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& name,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
@@ -301,11 +301,11 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u
}
if (!nowait) {
string queueName = queue->getName();
- connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount());
+ connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_int16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
@@ -314,7 +314,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_in
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) connection.client->getQueue().bindOk(channel.getId());
+ if(!nowait) connection.client->getQueue().bindOk(context);
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -341,14 +341,14 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
connection.client->getQueue().unbindOk(channel.getId());
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
- if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count);
+ if(!nowait) connection.client->getQueue().purgeOk(context, count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& queue,
+void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -368,21 +368,21 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u
broker.getQueues().destroy(queue);
}
- if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count);
+ if(!nowait) connection.client->getQueue().deleteOk(context, count);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getBasic().qosOk(channel.getId());
+ connection.client->getBasic().qosOk(context);
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
- const MethodContext&, u_int16_t /*ticket*/,
+ const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
@@ -398,7 +398,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
channel.consume(
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag);
+ if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -409,10 +409,10 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext&, const string& consumerTag, bool nowait){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag);
+ if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
}
void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/,
@@ -429,12 +429,12 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- connection.client->getBasic().getEmpty(channel.getId(), clusterId);
+ connection.client->getBasic().getEmpty(context, clusterId);
}
}
@@ -452,20 +452,20 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, b
channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
- connection.client->getTx().selectOk(channel.getId());
+ connection.client->getTx().selectOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
- connection.client->getTx().commitOk(channel.getId());
+ connection.client->getTx().commitOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext&){
+void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
- connection.client->getTx().rollbackOk(channel.getId());
+ connection.client->getTx().rollbackOk(context);
channel.recover(false);
}
@@ -499,6 +499,7 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
BrokerAdapter::BrokerAdapter(
Channel* ch, Connection& c, Broker& b
) :
+ ChannelAdapter(c.getOutput(), ch->getId()),
channel(ch),
connection(c),
broker(b),
@@ -507,24 +508,25 @@ BrokerAdapter::BrokerAdapter(
assert(ch);
}
-void BrokerAdapter::handleMethod(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method)
+void BrokerAdapter::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
{
try{
- // FIXME aconway 2007-01-17: invoke to take Channel&?
- method->invoke(*serverOps, channel->getId());
+ method->invoke(*serverOps, context);
}catch(ChannelException& e){
- connection.closeChannel(channel->getId());
+ connection.closeChannel(getId());
connection.client->getChannel().close(
- channel->getId(), e.code, e.toString(),
+ context, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
connection.client->getConnection().close(
- 0, e.code, e.toString(),
+ context, e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
connection.client->getConnection().close(
- 0, 541/*internal error*/, e.what(),
+ context, 541/*internal error*/, e.what(),
method->amqpClassId(), method->amqpMethodId());
}
}
diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h
index 5309d767f5..37eba7d52b 100644
--- a/cpp/lib/broker/BrokerAdapter.h
+++ b/cpp/lib/broker/BrokerAdapter.h
@@ -22,6 +22,7 @@
#include "AMQP_ServerOperations.h"
#include "BodyHandler.h"
#include "BrokerChannel.h"
+#include "amqp_types.h"
namespace qpid {
namespace broker {
@@ -40,19 +41,22 @@ class Broker;
*
* Owns a channel, has references to Connection and Broker.
*/
-class BrokerAdapter : public qpid::framing::BodyHandler
+class BrokerAdapter : public qpid::framing::ChannelAdapter
{
public:
// FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel>
BrokerAdapter(Channel* ch, Connection&, Broker&);
Channel& getChannel() { return *channel; }
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>);
private:
+ void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+
class ServerOps;
std::auto_ptr<Channel> channel;
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index dabf6ebe40..7cfb96241d 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -45,7 +45,9 @@
#include <OutputHandler.h>
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
+#include <AMQHeartbeatBody.h>
#include <BasicPublishBody.h>
+#include "ChannelAdapter.h"
namespace qpid {
namespace broker {
@@ -56,8 +58,7 @@ using qpid::framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel : private MessageBuilder::CompletionHandler
-{
+class Channel : private MessageBuilder::CompletionHandler {
class ConsumerImpl : public virtual Consumer
{
Channel* parent;
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 6ba2131a74..07b14a4eff 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -26,6 +26,7 @@
#include <MessageStore.h>
#include <BasicDeliverBody.h>
#include <BasicGetOkBody.h>
+#include "AMQFrame.h"
using namespace boost;
using namespace qpid::broker;
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index 3db5bcd074..dc8bdeda05 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -31,12 +31,12 @@ using namespace qpid::sys;
namespace qpid {
namespace broker {
-Connection::Connection(SessionContext* context_, Broker& broker_) :
- context(context_),
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
framemax(65536),
heartbeat(0),
broker(broker_),
- settings(broker.getTimeout(), broker.getStagingThreshold())
+ settings(broker.getTimeout(), broker.getStagingThreshold()),
+ out(out_)
{}
Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
@@ -68,14 +68,15 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
// TODO aconway 2007-01-16: correct error code.
throw ConnectionException(0, "Connection initiated twice");
client.reset(new qpid::framing::AMQP_ClientProxy(
- context, header->getMajor(), header->getMinor()));
+ out, header->getMajor(), header->getMinor()));
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- // TODO aconway 2007-01-16: Move to adapter.
+ // TODO aconway 2007-01-16: Client call, move to adapter.
client->getConnection().start(
- 0, header->getMajor(), header->getMinor(), properties,
- mechanisms, locales);
+ MethodContext(0, out),
+ header->getMajor(), header->getMinor(),
+ properties, mechanisms, locales);
}
void Connection::idleOut(){}
@@ -105,7 +106,7 @@ BrokerAdapter& Connection::getAdapter(u_int16_t id) {
AdapterMap::iterator i = adapters.find(id);
if (i == adapters.end()) {
Channel* ch=new Channel(
- client->getProtocolVersion(), context, id,
+ client->getProtocolVersion(), out, id,
framemax, broker.getQueues().getStore(),
settings.stagingThreshold);
BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h
index 90346b7c9d..d5d90d4830 100644
--- a/cpp/lib/broker/Connection.h
+++ b/cpp/lib/broker/Connection.h
@@ -29,7 +29,7 @@
#include <AMQFrame.h>
#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
-#include <sys/SessionContext.h>
+#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include <sys/TimeoutHandler.h>
#include "Broker.h"
@@ -50,18 +50,8 @@ class Settings {
class Connection : public qpid::sys::ConnectionInputHandler,
public ConnectionToken
{
- typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
-
- // FIXME aconway 2007-01-16: on broker.
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- Exchange::shared_ptr findExchange(const string& name);
-
- BrokerAdapter& getAdapter(u_int16_t id);
-
- AdapterMap adapters;
-
public:
- Connection(qpid::sys::SessionContext* context, Broker& broker);
+ Connection(qpid::sys::ConnectionOutputHandler* out, Broker& broker);
// ConnectionInputHandler methods
void received(qpid::framing::AMQFrame* frame);
void initiated(qpid::framing::ProtocolInitiation* header);
@@ -69,8 +59,9 @@ class Connection : public qpid::sys::ConnectionInputHandler,
void idleIn();
void closed();
+ qpid::sys::ConnectionOutputHandler& getOutput() { return *out; }
+
// FIXME aconway 2007-01-16: encapsulate.
- qpid::sys::SessionContext* context;
u_int32_t framemax;
u_int16_t heartbeat;
Broker& broker;
@@ -91,6 +82,19 @@ class Connection : public qpid::sys::ConnectionInputHandler,
Channel& newChannel(u_int16_t channel);
Channel& getChannel(u_int16_t channel);
void closeChannel(u_int16_t channel);
+
+ private:
+ typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap;
+
+ // FIXME aconway 2007-01-16: on broker.
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ Exchange::shared_ptr findExchange(const string& name);
+
+ BrokerAdapter& getAdapter(u_int16_t id);
+
+ AdapterMap adapters;
+ qpid::sys::ConnectionOutputHandler* out;
+
};
}}
diff --git a/cpp/lib/broker/ConnectionFactory.cpp b/cpp/lib/broker/ConnectionFactory.cpp
index 3c4c7cd724..20485dd0e1 100644
--- a/cpp/lib/broker/ConnectionFactory.cpp
+++ b/cpp/lib/broker/ConnectionFactory.cpp
@@ -35,9 +35,9 @@ ConnectionFactory::~ConnectionFactory()
}
qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::SessionContext* ctxt)
+ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out)
{
- return new Connection(ctxt, broker);
+ return new Connection(out, broker);
}
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/ConnectionFactory.h b/cpp/lib/broker/ConnectionFactory.h
index fe8052ed9c..b699dd0af4 100644
--- a/cpp/lib/broker/ConnectionFactory.h
+++ b/cpp/lib/broker/ConnectionFactory.h
@@ -32,7 +32,7 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
public:
ConnectionFactory(Broker& b);
- virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext* ctxt);
+ virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler* ctxt);
virtual ~ConnectionFactory();
diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp
index 07af8633e5..e6db6b0539 100644
--- a/cpp/lib/broker/InMemoryContent.cpp
+++ b/cpp/lib/broker/InMemoryContent.cpp
@@ -19,6 +19,7 @@
*
*/
#include <InMemoryContent.h>
+#include "AMQFrame.h"
using namespace qpid::broker;
using namespace qpid::framing;
diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp
index ec1ca3e195..3bf1b374ea 100644
--- a/cpp/lib/broker/LazyLoadedContent.cpp
+++ b/cpp/lib/broker/LazyLoadedContent.cpp
@@ -19,6 +19,7 @@
*
*/
#include <LazyLoadedContent.h>
+#include "AMQFrame.h"
using namespace qpid::broker;
using namespace qpid::framing;
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 07ede4097a..33f7a63d45 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -64,7 +64,7 @@ MessageHandlerImpl::close(const MethodContext&,
}
void
-MessageHandlerImpl::consume(const MethodContext&,
+MessageHandlerImpl::consume(const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
const string& destination,
@@ -85,7 +85,7 @@ MessageHandlerImpl::consume(const MethodContext&,
string newTag = destination;
channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
- connection.client->getMessageHandler()->ok(channel.getId());
+ connection.client->getMessageHandler()->ok(context);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -102,7 +102,7 @@ MessageHandlerImpl::empty( const MethodContext& )
}
void
-MessageHandlerImpl::get( const MethodContext&,
+MessageHandlerImpl::get( const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
const string& /*destination*/,
@@ -110,12 +110,12 @@ MessageHandlerImpl::get( const MethodContext&,
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue =
+ connection.getQueue(queueName, context.channelId);
// FIXME: get is probably Basic specific
- if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
-
- connection.client->getMessageHandler()->empty(channel.getId());
+ if(!channel.get(queue, !noAck)){
+ connection.client->getMessageHandler()->empty(context);
}
}
@@ -141,7 +141,7 @@ MessageHandlerImpl::open(const MethodContext&,
}
void
-MessageHandlerImpl::qos(const MethodContext&,
+MessageHandlerImpl::qos(const MethodContext& context,
u_int32_t prefetchSize,
u_int16_t prefetchCount,
bool /*global*/ )
@@ -152,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext&,
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getMessageHandler()->ok(channel.getId());
+ connection.client->getMessageHandler()->ok(context);
}
void
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index d97d29153a..0eb9e119f5 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -37,62 +37,62 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH
MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
: channel(ch), connection(c), broker(b) {}
- void append(const qpid::framing::MethodContext&,
+ void append(const framing::MethodContext&,
const std::string& reference,
const std::string& bytes );
- void cancel(const qpid::framing::MethodContext&,
+ void cancel(const framing::MethodContext&,
const std::string& destination );
- void checkpoint(const qpid::framing::MethodContext&,
+ void checkpoint(const framing::MethodContext&,
const std::string& reference,
const std::string& identifier );
- void close(const qpid::framing::MethodContext&,
+ void close(const framing::MethodContext&,
const std::string& reference );
- void consume(const qpid::framing::MethodContext&,
+ void consume(const framing::MethodContext&,
u_int16_t ticket,
const std::string& queue,
const std::string& destination,
bool noLocal,
bool noAck,
bool exclusive,
- const qpid::framing::FieldTable& filter );
+ const framing::FieldTable& filter );
- void empty( const qpid::framing::MethodContext& );
+ void empty( const framing::MethodContext& );
- void get(const qpid::framing::MethodContext&,
+ void get(const framing::MethodContext&,
u_int16_t ticket,
const std::string& queue,
const std::string& destination,
bool noAck );
- void offset(const qpid::framing::MethodContext&,
+ void offset(const framing::MethodContext&,
u_int64_t value );
- void ok( const qpid::framing::MethodContext& );
+ void ok( const framing::MethodContext& );
- void open(const qpid::framing::MethodContext&,
+ void open(const framing::MethodContext&,
const std::string& reference );
- void qos(const qpid::framing::MethodContext&,
+ void qos(const framing::MethodContext&,
u_int32_t prefetchSize,
u_int16_t prefetchCount,
bool global );
- void recover(const qpid::framing::MethodContext&,
+ void recover(const framing::MethodContext&,
bool requeue );
- void reject(const qpid::framing::MethodContext&,
+ void reject(const framing::MethodContext&,
u_int16_t code,
const std::string& text );
- void resume(const qpid::framing::MethodContext&,
+ void resume(const framing::MethodContext&,
const std::string& reference,
const std::string& identifier );
- void transfer(const qpid::framing::MethodContext&,
+ void transfer(const framing::MethodContext&,
u_int16_t ticket,
const std::string& destination,
bool redelivered,
@@ -113,8 +113,8 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH
const std::string& appId,
const std::string& transactionId,
const std::string& securityToken,
- const qpid::framing::FieldTable& applicationHeaders,
- qpid::framing::Content body );
+ const framing::FieldTable& applicationHeaders,
+ framing::Content body );
};
}} // namespace qpid::broker
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index af26990d8a..d9edb2f390 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -256,6 +256,16 @@ void Channel::rollback(){
sendAndReceive(frame, method_bodies.tx_rollback_ok);
}
+void Channel::handleRequest(AMQRequestBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
+void Channel::handleResponse(AMQResponseBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
void Channel::handleMethod(AMQMethodBody::shared_ptr body){
//channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index 5beda0296e..e7bab8b4ee 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -67,7 +67,9 @@ namespace client {
*
* \ingroup clientapi
*/
- class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{
+ class Channel : private virtual framing::BodyHandler,
+ public virtual sys::Runnable
+ {
struct Consumer{
MessageListener* listener;
int ackMode;
@@ -78,36 +80,38 @@ namespace client {
u_int16_t id;
Connection* con;
- qpid::sys::Thread dispatcher;
- qpid::framing::OutputHandler* out;
+ sys::Thread dispatcher;
+ framing::OutputHandler* out;
IncomingMessage* incoming;
ResponseHandler responses;
std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
IncomingMessage* retrieved;//holds response to basic.get
- qpid::sys::Monitor dispatchMonitor;
- qpid::sys::Monitor retrievalMonitor;
+ sys::Monitor dispatchMonitor;
+ sys::Monitor retrievalMonitor;
std::map<std::string, Consumer*> consumers;
ReturnedMessageHandler* returnsHandler;
bool closed;
u_int16_t prefetch;
const bool transactional;
- qpid::framing::ProtocolVersion version;
+ framing::ProtocolVersion version;
void enqueue();
void retrieve(Message& msg);
IncomingMessage* dequeue();
void dispatch();
void stop();
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+ void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
void deliver(Consumer* consumer, Message& msg);
void setQos();
void cancelAll();
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+ virtual void handleMethod(framing::AMQMethodBody::shared_ptr body);
+ virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body);
+ virtual void handleContent(framing::AMQContentBody::shared_ptr body);
+ virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
+ void handleRequest(framing::AMQRequestBody::shared_ptr);
+ void handleResponse(framing::AMQResponseBody::shared_ptr);
public:
/**
@@ -185,7 +189,7 @@ namespace client {
* is received from the broker
*/
void bind(const Exchange& exchange, const Queue& queue, const std::string& key,
- const qpid::framing::FieldTable& args, bool synch = true);
+ const framing::FieldTable& args, bool synch = true);
/**
* Creates a 'consumer' for a queue. Messages in (or arriving
* at) that queue will be delivered to consumers
@@ -216,7 +220,7 @@ namespace client {
void consume(
Queue& queue, std::string& tag, MessageListener* listener,
int ackMode = NO_ACK, bool noLocal = false, bool synch = true,
- const qpid::framing::FieldTable* fields = 0);
+ const framing::FieldTable* fields = 0);
/**
* Cancels a subscription previously set up through a call to consume().
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index dd1e372095..1ae317db62 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -184,6 +184,16 @@ void Connection::handleFrame(AMQFrame* frame){
}
}
+void Connection::handleRequest(AMQRequestBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
+void Connection::handleResponse(AMQResponseBody::shared_ptr body) {
+ // FIXME aconway 2007-01-19: request/response handling.
+ handleMethod(body);
+}
+
void Connection::handleMethod(AMQMethodBody::shared_ptr body){
//connection.close, basic.deliver, basic.return or a response to a synchronous request
if(responses.isWaiting()){
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 21e2fb90a2..9c9b067f88 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -66,7 +66,8 @@ namespace client {
class Connection : public virtual qpid::framing::InputHandler,
public virtual qpid::sys::TimeoutHandler,
public virtual qpid::sys::ShutdownHandler,
- private virtual qpid::framing::BodyHandler{
+ private virtual qpid::framing::BodyHandler
+ {
typedef std::map<int, Channel*>::iterator iterator;
@@ -80,20 +81,25 @@ namespace client {
qpid::framing::OutputHandler* out;
ResponseHandler responses;
volatile bool closed;
- qpid::framing::ProtocolVersion version;
- qpid::framing::Requester requester;
- qpid::framing::Responder responder;
+ framing::ProtocolVersion version;
+ framing::Requester requester;
+ framing::Responder responder;
- void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
+ void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
- void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
-
- virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
- virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
- virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
- virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
- void handleFrame(qpid::framing::AMQFrame* frame);
+ void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body);
+
+ // FIXME aconway 2007-01-19: Use channel(0) not connection
+ // to handle channel 0 requests. Remove handler methods.
+ //
+ void handleRequest(framing::AMQRequestBody::shared_ptr);
+ void handleResponse(framing::AMQResponseBody::shared_ptr);
+ void handleMethod(framing::AMQMethodBody::shared_ptr);
+ void handleHeader(framing::AMQHeaderBody::shared_ptr);
+ void handleContent(framing::AMQContentBody::shared_ptr);
+ void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr);
+ void handleFrame(framing::AMQFrame* frame);
public:
/**
@@ -110,7 +116,7 @@ namespace client {
* client will accept. Optional and defaults to 65536.
*/
Connection( bool debug = false, u_int32_t max_frame_size = 65536,
- qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion));
+ framing::ProtocolVersion* _version = &(framing::highestProtocolVersion));
~Connection();
/**
@@ -163,7 +169,7 @@ namespace client {
*/
void removeChannel(Channel* channel);
- virtual void received(qpid::framing::AMQFrame* frame);
+ virtual void received(framing::AMQFrame* frame);
virtual void idleOut();
virtual void idleIn();
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index 813c49135e..eefff79d6f 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -65,6 +65,7 @@ libqpidcommon_la_SOURCES = \
$(framing)/AMQMethodBody.cpp \
$(framing)/BasicHeaderProperties.cpp \
$(framing)/BodyHandler.cpp \
+ $(framing)/ChannelAdapter.cpp \
$(framing)/Buffer.cpp \
$(framing)/FieldTable.cpp \
$(framing)/FramingContent.cpp \
@@ -96,6 +97,7 @@ nobase_pkginclude_HEADERS = \
$(framing)/AMQMethodBody.h \
$(framing)/BasicHeaderProperties.h \
$(framing)/BodyHandler.h \
+ $(framing)/ChannelAdapter.h \
$(framing)/Buffer.h \
$(framing)/FieldTable.h \
$(framing)/FramingContent.h \
@@ -119,7 +121,7 @@ nobase_pkginclude_HEADERS = \
sys/Monitor.h \
sys/Mutex.h \
sys/Runnable.h \
- sys/SessionContext.h \
+ sys/ConnectionOutputHandler.h \
sys/ConnectionInputHandler.h \
sys/ConnectionInputHandlerFactory.h \
sys/ShutdownHandler.h \
diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp
index 8ac5199c45..c6837af668 100644
--- a/cpp/lib/common/framing/AMQFrame.cpp
+++ b/cpp/lib/common/framing/AMQFrame.cpp
@@ -28,15 +28,15 @@ using namespace qpid::framing;
AMQP_MethodVersionMap AMQFrame::versionMap;
-AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version):
+AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version):
version(_version)
{}
-AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) :
+AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) :
version(_version), channel(_channel), body(_body)
{}
-AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) :
+AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) :
version(_version), channel(_channel), body(_body)
{}
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index c27de70e5a..f3c3232d56 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -41,9 +41,9 @@ namespace framing {
class AMQFrame : virtual public AMQDataBlock
{
public:
- AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
- AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
- AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body);
+ AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion);
+ AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body);
+ AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body);
virtual ~AMQFrame();
virtual void encode(Buffer& buffer);
virtual bool decode(Buffer& buffer);
diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp
index 0c77a1c64a..73b729b945 100644
--- a/cpp/lib/common/framing/AMQMethodBody.cpp
+++ b/cpp/lib/common/framing/AMQMethodBody.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <AMQFrame.h>
#include <AMQMethodBody.h>
#include <QpidError.h>
#include "AMQP_MethodVersionMap.h"
@@ -59,5 +60,8 @@ void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/) {
decodeContent(buffer);
}
+void AMQMethodBody::send(const MethodContext& context) {
+ context.out->send(new AMQFrame(version, context.channelId, this));
+}
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h
index 9f859046f8..ff09ee60e1 100644
--- a/cpp/lib/common/framing/AMQMethodBody.h
+++ b/cpp/lib/common/framing/AMQMethodBody.h
@@ -53,6 +53,13 @@ class AMQMethodBody : public AMQBody
virtual void invoke(AMQP_ServerOperations&, const MethodContext&);
bool match(AMQMethodBody* other) const;
+
+ /**
+ * Wrap this method in a frame and send using the current context.
+ * Note the frame takes ownership of the body, it will be deleted.
+ */
+ virtual void send(const MethodContext& context);
+
protected:
static u_int32_t baseSize() { return 4; }
diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp
index c64b1325d6..dffbb62aca 100644
--- a/cpp/lib/common/framing/AMQResponseBody.cpp
+++ b/cpp/lib/common/framing/AMQResponseBody.cpp
@@ -16,6 +16,7 @@
*
*/
+#include "AMQFrame.h"
#include "AMQResponseBody.h"
#include "AMQP_MethodVersionMap.h"
@@ -61,5 +62,11 @@ void AMQResponseBody::printPrefix(std::ostream& out) const {
<< ",batch=" << data.batchOffset << "): ";
}
+void AMQResponseBody::send(const MethodContext& context) {
+ setRequestId(context.requestId);
+ assert(context.out);
+ context.out->send(
+ new AMQFrame(version, context.channelId, this));
+}
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h
index 6528613a12..2520a481f2 100644
--- a/cpp/lib/common/framing/AMQResponseBody.h
+++ b/cpp/lib/common/framing/AMQResponseBody.h
@@ -65,6 +65,11 @@ class AMQResponseBody : public AMQMethodBody
ResponseId getResponseId() { return data.responseId; }
RequestId getRequestId() { return data.requestId; }
BatchOffset getBatchOffset() { return data.batchOffset; }
+ void setResponseId(ResponseId id) { data.responseId = id; }
+ void setRequestId(RequestId id) { data.requestId = id; }
+ void setBatchOffset(BatchOffset id) { data.batchOffset = id; }
+
+ virtual void send(const MethodContext& context);
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp
index 72ba82468f..5dd0c0c23d 100644
--- a/cpp/lib/common/framing/BodyHandler.cpp
+++ b/cpp/lib/common/framing/BodyHandler.cpp
@@ -58,22 +58,3 @@ void BodyHandler::handleBody(shared_ptr<AMQBody> body) {
}
}
-void BodyHandler::handleRequest(AMQRequestBody::shared_ptr request) {
- responder.received(request->getData());
- handleMethod(request);
-}
-
-void BodyHandler::handleResponse(AMQResponseBody::shared_ptr response) {
- handleMethod(response);
- requester.processed(response->getData());
-}
-
-void BodyHandler::assertChannelZero(u_int16_t id) {
- if (id != 0)
- throw ConnectionException(504, "Invalid channel id, not 0");
-}
-
-void BodyHandler::assertChannelNonZero(u_int16_t id) {
- if (id == 0)
- throw ConnectionException(504, "Invalid channel id 0");
-}
diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h
index c9c74e2b3f..cb3f0997b0 100644
--- a/cpp/lib/common/framing/BodyHandler.h
+++ b/cpp/lib/common/framing/BodyHandler.h
@@ -38,35 +38,21 @@ class AMQContentBody;
class AMQHeartbeatBody;
/**
- * Base class for client and broker channel handlers.
- *
- * Handles request/response id management common to client and broker.
- * Derived classes provide remaining client/broker specific handling.
+ * Interface to handle incoming frame bodies.
+ * Derived classes provide logic for each frame type.
*/
class BodyHandler {
public:
virtual ~BodyHandler();
-
- void handleBody(boost::shared_ptr<AMQBody> body);
+ virtual void handleBody(boost::shared_ptr<AMQBody> body);
protected:
- virtual void handleRequest(boost::shared_ptr<AMQRequestBody>);
- virtual void handleResponse(boost::shared_ptr<AMQResponseBody>);
-
+ virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0;
+ virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0;
virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0;
virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0;
virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0;
virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0;
-
- protected:
- /** Throw protocol exception if this is not channel 0. */
- static void assertChannelZero(u_int16_t id);
- /** Throw protocol exception if this is channel 0. */
- static void assertChannelNonZero(u_int16_t id);
-
- private:
- Requester requester;
- Responder responder;
};
}}
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
new file mode 100644
index 0000000000..cf6fea1455
--- /dev/null
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ChannelAdapter.h"
+#include "AMQFrame.h"
+
+namespace qpid {
+namespace framing {
+
+void ChannelAdapter::send(AMQFrame* frame) {
+ AMQBody::shared_ptr body = frame->getBody();
+ switch (body->type()) {
+ case REQUEST_BODY: {
+ AMQRequestBody::shared_ptr request =
+ boost::shared_polymorphic_downcast<AMQRequestBody>(body);
+ requester.sending(request->getData());
+ break;
+ }
+ case RESPONSE_BODY: {
+ AMQResponseBody::shared_ptr response =
+ boost::shared_polymorphic_downcast<AMQResponseBody>(body);
+ responder.sending(response->getData());
+ break;
+ }
+ }
+ out.send(frame);
+}
+
+void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
+ responder.received(request->getData());
+ MethodContext context(id, &out, request->getRequestId());
+ handleMethodInContext(request, context);
+}
+
+void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
+ handleMethod(response);
+ requester.processed(response->getData());
+}
+
+void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) {
+ MethodContext context(id, this);
+ handleMethodInContext(method, context);
+}
+
+void ChannelAdapter::assertChannelZero(u_int16_t id) {
+ if (id != 0)
+ throw ConnectionException(504, "Invalid channel id, not 0");
+}
+
+void ChannelAdapter::assertChannelNonZero(u_int16_t id) {
+ if (id == 0)
+ throw ConnectionException(504, "Invalid channel id 0");
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
new file mode 100644
index 0000000000..0652cc41bb
--- /dev/null
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -0,0 +1,90 @@
+#ifndef _ChannelAdapter_
+#define _ChannelAdapter_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+#include "BodyHandler.h"
+#include "Requester.h"
+#include "Responder.h"
+#include "OutputHandler.h"
+
+namespace qpid {
+namespace framing {
+
+class MethodContext;
+
+/**
+ * Base class for client and broker channel adapters.
+ *
+ * As BodyHandler:
+ * - Creates MethodContext and dispatches methods+context to derived class.
+ * - Updates request/response ID data.
+ *
+ * As OutputHandler:
+ * - Updates request/resposne ID data.
+ *
+ */
+class ChannelAdapter : public BodyHandler, public OutputHandler {
+ public:
+ /**
+ *@param output Processed frames are forwarded to this handler.
+ */
+ ChannelAdapter(OutputHandler& output, ChannelId channelId)
+ : id(channelId), out(output) {}
+
+ ChannelId getId() { return id; }
+
+ /**
+ * Do request/response-id processing and then forward to
+ * handler provided to constructor. Response frames should
+ * have their request-id set before calling send.
+ */
+ void send(AMQFrame* frame);
+
+ void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
+ void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
+ void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+
+ protected:
+ /** Throw protocol exception if this is not channel 0. */
+ static void assertChannelZero(u_int16_t id);
+ /** Throw protocol exception if this is channel 0. */
+ static void assertChannelNonZero(u_int16_t id);
+
+ virtual void handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context) = 0;
+
+ ChannelId id;
+
+ private:
+ Requester requester;
+ Responder responder;
+ OutputHandler& out;
+};
+
+}}
+
+
+#endif
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
index 13d5f658ca..1aa4be8f1e 100644
--- a/cpp/lib/common/framing/MethodContext.h
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -19,6 +19,9 @@
*
*/
+#include "OutputHandler.h"
+#include "ProtocolVersion.h"
+
namespace qpid {
namespace framing {
@@ -26,11 +29,14 @@ class BodyHandler;
/**
* Invocation context for an AMQP method.
+ * Some of the context information is related to the channel, some
+ * to the specific invocation - e.g. requestId.
+ *
* All generated proxy and handler functions take a MethodContext parameter.
*
- * The user calling on a broker proxy can simply pass an integer
- * channel ID, it will implicitly be converted to an appropriate context.
- *
+ * The user does not need to create MethodContext objects explicitly,
+ * the constructor will implicitly create one from a channel ID.
+ *
* Other context members are for internal use.
*/
struct MethodContext
@@ -39,13 +45,21 @@ struct MethodContext
* Passing a integer channel-id in place of a MethodContext
* will automatically construct the MethodContext.
*/
- MethodContext(ChannelId channel, RequestId request=0)
- : channelId(channel), requestId(request) {}
+ MethodContext(
+ ChannelId channel, OutputHandler* output=0, RequestId request=0)
+ : channelId(channel), out(output), requestId(request){}
+
+ /** \internal Channel on which the method is sent. */
+ const ChannelId channelId;
+
+ /** Output handler for responses in this context */
+ OutputHandler* out;
+
+ /** \internal If we are in the context of processing an incoming request,
+ * this is the ID. Otherwise it is 0.
+ */
+ const RequestId requestId;
- /** Channel on which the method is sent. */
- ChannelId channelId;
- /** \internal For proxy response: the original request or 0. */
- RequestId requestId;
};
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h
index 2e01e34df2..9ffd4227d8 100644
--- a/cpp/lib/common/framing/OutputHandler.h
+++ b/cpp/lib/common/framing/OutputHandler.h
@@ -22,10 +22,10 @@
*
*/
#include <boost/noncopyable.hpp>
-#include <AMQFrame.h>
namespace qpid {
namespace framing {
+class AMQFrame;
class OutputHandler : private boost::noncopyable {
public:
diff --git a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
index 5bb5e17704..af7d411928 100644
--- a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
+++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h
@@ -26,7 +26,7 @@
namespace qpid {
namespace sys {
-class SessionContext;
+class ConnectionOutputHandler;
class ConnectionInputHandler;
/**
@@ -36,7 +36,7 @@ class ConnectionInputHandler;
class ConnectionInputHandlerFactory : private boost::noncopyable
{
public:
- virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0;
+ virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0;
virtual ~ConnectionInputHandlerFactory(){}
};
diff --git a/cpp/lib/common/sys/SessionContext.h b/cpp/lib/common/sys/ConnectionOutputHandler.h
index 671e00774f..91849e1dfb 100644
--- a/cpp/lib/common/sys/SessionContext.h
+++ b/cpp/lib/common/sys/ConnectionOutputHandler.h
@@ -18,8 +18,8 @@
* under the License.
*
*/
-#ifndef _SessionContext_
-#define _SessionContext_
+#ifndef _ConnectionOutputHandler_
+#define _ConnectionOutputHandler_
#include <OutputHandler.h>
@@ -29,7 +29,7 @@ namespace sys {
/**
* Provides the output handler associated with a connection.
*/
-class SessionContext : public virtual qpid::framing::OutputHandler
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler
{
public:
virtual void close() = 0;
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h
index 8cf50b87ba..81cfc0efda 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.h
+++ b/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -30,7 +30,7 @@
#include <AMQFrame.h>
#include <Buffer.h>
#include <sys/Monitor.h>
-#include <sys/SessionContext.h>
+#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include "APRSocket.h"
@@ -40,7 +40,7 @@ namespace qpid {
namespace sys {
-class LFSessionContext : public virtual qpid::sys::SessionContext
+class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler
{
const bool debug;
APRSocket socket;
diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
index 787d12d6d1..548fbd1881 100644
--- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
@@ -26,7 +26,7 @@
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
-#include <sys/SessionContext.h>
+#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include <sys/ConnectionInputHandlerFactory.h>
#include <sys/Acceptor.h>
diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h
index 1504e92651..da7b6dca27 100644
--- a/cpp/lib/common/sys/posix/EventChannelConnection.h
+++ b/cpp/lib/common/sys/posix/EventChannelConnection.h
@@ -23,7 +23,7 @@
#include "EventChannelThreads.h"
#include "sys/Monitor.h"
-#include "sys/SessionContext.h"
+#include "sys/ConnectionOutputHandler.h"
#include "sys/ConnectionInputHandler.h"
#include "sys/AtomicCount.h"
#include "framing/AMQFrame.h"
@@ -34,13 +34,13 @@ namespace sys {
class ConnectionInputHandlerFactory;
/**
- * Implements SessionContext and delegates to a ConnectionInputHandler
+ * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler
* for a connection via the EventChannel.
*@param readDescriptor file descriptor for reading.
*@param writeDescriptor file descriptor for writing,
* by default same as readDescriptor
*/
-class EventChannelConnection : public SessionContext {
+class EventChannelConnection : public ConnectionOutputHandler {
public:
EventChannelConnection(
EventChannelThreads::shared_ptr threads,
@@ -50,7 +50,7 @@ class EventChannelConnection : public SessionContext {
bool isTrace = false
);
- // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
+ // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr
virtual void send(qpid::framing::AMQFrame* frame) {
send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
}
diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h
index 5afbe8f5a8..57b5a5757c 100644
--- a/cpp/lib/common/sys/posix/check.h
+++ b/cpp/lib/common/sys/posix/check.h
@@ -45,7 +45,7 @@ class PosixError : public qpid::QpidError
Exception* clone() const throw() { return new PosixError(*this); }
- void throwSelf() { throw *this; }
+ void throwSelf() const { throw *this; }
private:
int errNo;
diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp
index e7a02d631d..b31ff6a321 100644
--- a/cpp/tests/ChannelTest.cpp
+++ b/cpp/tests/ChannelTest.cpp
@@ -27,7 +27,7 @@
#include <iostream>
#include <memory>
#include <AMQP_HighestVersion.h>
-
+#include "AMQFrame.h"
using namespace boost;
using namespace qpid::broker;
@@ -107,6 +107,9 @@ class ChannelTest : public CppUnit::TestCase
handle(call);
}
+ // Don't hide overloads.
+ using NullMessageStore::destroy;
+
void destroy(Message* msg)
{
MethodCall call = {"destroy", msg, ""};
diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp
index bd638dae66..1494518578 100644
--- a/cpp/tests/InMemoryContentTest.cpp
+++ b/cpp/tests/InMemoryContentTest.cpp
@@ -23,6 +23,7 @@
#include <AMQP_HighestVersion.h>
#include <iostream>
#include <list>
+#include "AMQFrame.h"
using std::list;
using std::string;
diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp
index 2075a6dd3a..624d2be3ff 100644
--- a/cpp/tests/LazyLoadedContentTest.cpp
+++ b/cpp/tests/LazyLoadedContentTest.cpp
@@ -25,6 +25,7 @@
#include <iostream>
#include <list>
#include <sstream>
+#include "AMQFrame.h"
using std::list;
using std::string;
diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp
index 88e8318832..21f5935218 100644
--- a/cpp/tests/MessageBuilderTest.cpp
+++ b/cpp/tests/MessageBuilderTest.cpp
@@ -71,6 +71,9 @@ class MessageBuilderTest : public CppUnit::TestCase
}
}
+ // Don't hide overloads.
+ using NullMessageStore::destroy;
+
void destroy(Message* msg)
{
CPPUNIT_ASSERT(msg->getPersistenceId());
diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp
index bcf3ad8064..8bb570e598 100644
--- a/cpp/tests/MessageTest.cpp
+++ b/cpp/tests/MessageTest.cpp
@@ -22,6 +22,7 @@
#include <qpid_test_plugin.h>
#include <iostream>
#include <AMQP_HighestVersion.h>
+#include "AMQFrame.h"
using namespace boost;
using namespace qpid::broker;
diff --git a/cpp/tests/MockConnectionInputHandler.h b/cpp/tests/MockConnectionInputHandler.h
index 522df0900f..b039e244d9 100644
--- a/cpp/tests/MockConnectionInputHandler.h
+++ b/cpp/tests/MockConnectionInputHandler.h
@@ -89,7 +89,7 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler {
struct MockConnectionInputHandlerFactory : public qpid::sys::ConnectionInputHandlerFactory {
MockConnectionInputHandlerFactory() : handler(0) {}
- qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext*) {
+ qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler*) {
qpid::sys::Monitor::ScopedLock lock(monitor);
handler = new MockConnectionInputHandler();
monitor.notifyAll();