diff options
Diffstat (limited to 'cpp')
43 files changed, 422 insertions, 200 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac index 058bce148f..7359ba34e6 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -66,6 +66,7 @@ if test "${enableval}" = yes; then gl_COMPILER_FLAGS(-Wvolatile-register-var) gl_COMPILER_FLAGS(-Winvalid-pch) gl_COMPILER_FLAGS(-Wno-system-headers) + gl_COMPILER_FLAGS(-Woverloaded-virtual) AC_SUBST([WARNING_CFLAGS], [$COMPILER_FLAGS]) AC_DEFINE([lint], 1, [Define to 1 if the compiler is checking for lint.]) COMPILER_FLAGS= diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 079eb5fd73..b9e7990861 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -30,7 +30,6 @@ #include "NullMessageStore.h" #include "ProtocolInitiation.h" #include "Connection.h" -#include "sys/SessionContext.h" #include "sys/ConnectionInputHandler.h" #include "sys/ConnectionInputHandlerFactory.h" #include "sys/TimeoutHandler.h" @@ -97,7 +96,9 @@ void Broker::shutdown() { acceptor->shutdown(); } -Broker::~Broker() { } +Broker::~Broker() { + shutdown(); +} const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 0d34868710..fda7d15784 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -181,9 +181,9 @@ class BrokerAdapter::ServerOps : public AMQP_ServerOperations }; void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk( - const MethodContext& , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, + const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ - connection.client->getConnection().tune(0, 100, connection.framemax, connection.heartbeat); + connection.client->getConnection().tune(context, 100, connection.framemax, connection.heartbeat); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){} @@ -193,40 +193,40 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(const MethodContext connection.heartbeat = heartbeat; } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext&, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; - connection.client->getConnection().openOk(0, knownhosts); + connection.client->getConnection().openOk(context, knownhosts); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( - const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/, + const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getConnection().closeOk(0); - connection.context->close(); + connection.client->getConnection().closeOk(context); + connection.getOutput().close(); } void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){ - connection.context->close(); + connection.getOutput().close(); } void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( - const MethodContext&, const string& /*outOfBand*/){ + const MethodContext& context, const string& /*outOfBand*/){ // FIXME aconway 2007-01-17: Assertions on all channel methods, assertChannelNonZero(channel.getId()); if (channel.isOpen()) throw ConnectionException(504, "Channel already open"); channel.open(); // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 - connection.client->getChannel().openOk(channel.getId(), std::string()/* ID */); + connection.client->getChannel().openOk(context, std::string()/* ID */); } void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext&, u_int16_t /*replyCode*/, const string& /*replyText*/, +void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/){ - connection.client->getChannel().closeOk(channel.getId()); + connection.client->getChannel().closeOk(context); // FIXME aconway 2007-01-18: Following line destroys this. Ugly. connection.closeChannel(channel.getId()); } @@ -235,7 +235,7 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&) -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& exchange, const string& type, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& /*arguments*/){ @@ -258,19 +258,19 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& } } if(!nowait){ - connection.client->getExchange().declareOk(channel.getId()); + connection.client->getExchange().declareOk(context); } } -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, +void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); - if(!nowait) connection.client->getExchange().deleteOk(channel.getId()); + if(!nowait) connection.client->getExchange().deleteOk(context); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u_int16_t /*ticket*/, const string& name, +void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; @@ -301,11 +301,11 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext&, u } if (!nowait) { string queueName = queue->getName(); - connection.client->getQueue().declareOk(channel.getId(), queueName, queue->getMessageCount(), queue->getConsumerCount()); + connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount()); } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, +void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ @@ -314,7 +314,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext&, u_in if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) connection.client->getQueue().bindOk(channel.getId()); + if(!nowait) connection.client->getQueue().bindOk(context); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -341,14 +341,14 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( connection.client->getQueue().unbindOk(channel.getId()); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); - if(!nowait) connection.client->getQueue().purgeOk(channel.getId(), count); + if(!nowait) connection.client->getQueue().purgeOk(context, count); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u_int16_t /*ticket*/, const string& queue, +void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -368,21 +368,21 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext&, u broker.getQueues().destroy(queue); } - if(!nowait) connection.client->getQueue().deleteOk(channel.getId(), count); + if(!nowait) connection.client->getQueue().deleteOk(context, count); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getBasic().qosOk(channel.getId()); + connection.client->getBasic().qosOk(context); } void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( - const MethodContext&, u_int16_t /*ticket*/, + const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const FieldTable& fields) @@ -398,7 +398,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( channel.consume( newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(channel.getId(), newTag); + if(!nowait) connection.client->getBasic().consumeOk(context, newTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -409,10 +409,10 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( } -void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext&, const string& consumerTag, bool nowait){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) connection.client->getBasic().cancelOk(channel.getId(), consumerTag); + if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); } void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/, @@ -429,12 +429,12 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext&, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack - connection.client->getBasic().getEmpty(channel.getId(), clusterId); + connection.client->getBasic().getEmpty(context, clusterId); } } @@ -452,20 +452,20 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, b channel.recover(requeue); } -void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext&){ +void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); - connection.client->getTx().selectOk(channel.getId()); + connection.client->getTx().selectOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext&){ +void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); - connection.client->getTx().commitOk(channel.getId()); + connection.client->getTx().commitOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext&){ +void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); - connection.client->getTx().rollbackOk(channel.getId()); + connection.client->getTx().rollbackOk(context); channel.recover(false); } @@ -499,6 +499,7 @@ BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( BrokerAdapter::BrokerAdapter( Channel* ch, Connection& c, Broker& b ) : + ChannelAdapter(c.getOutput(), ch->getId()), channel(ch), connection(c), broker(b), @@ -507,24 +508,25 @@ BrokerAdapter::BrokerAdapter( assert(ch); } -void BrokerAdapter::handleMethod( - boost::shared_ptr<qpid::framing::AMQMethodBody> method) +void BrokerAdapter::handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context +) { try{ - // FIXME aconway 2007-01-17: invoke to take Channel&? - method->invoke(*serverOps, channel->getId()); + method->invoke(*serverOps, context); }catch(ChannelException& e){ - connection.closeChannel(channel->getId()); + connection.closeChannel(getId()); connection.client->getChannel().close( - channel->getId(), e.code, e.toString(), + context, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ connection.client->getConnection().close( - 0, e.code, e.toString(), + context, e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ connection.client->getConnection().close( - 0, 541/*internal error*/, e.what(), + context, 541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h index 5309d767f5..37eba7d52b 100644 --- a/cpp/lib/broker/BrokerAdapter.h +++ b/cpp/lib/broker/BrokerAdapter.h @@ -22,6 +22,7 @@ #include "AMQP_ServerOperations.h" #include "BodyHandler.h" #include "BrokerChannel.h" +#include "amqp_types.h" namespace qpid { namespace broker { @@ -40,19 +41,22 @@ class Broker; * * Owns a channel, has references to Connection and Broker. */ -class BrokerAdapter : public qpid::framing::BodyHandler +class BrokerAdapter : public qpid::framing::ChannelAdapter { public: // FIXME aconway 2007-01-18: takes ownership, should pass auto_ptr<Channel> BrokerAdapter(Channel* ch, Connection&, Broker&); Channel& getChannel() { return *channel; } - void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); private: + void handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const framing::MethodContext& context); + class ServerOps; std::auto_ptr<Channel> channel; diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index dabf6ebe40..7cfb96241d 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -45,7 +45,9 @@ #include <OutputHandler.h> #include <AMQContentBody.h> #include <AMQHeaderBody.h> +#include <AMQHeartbeatBody.h> #include <BasicPublishBody.h> +#include "ChannelAdapter.h" namespace qpid { namespace broker { @@ -56,8 +58,7 @@ using qpid::framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : private MessageBuilder::CompletionHandler -{ +class Channel : private MessageBuilder::CompletionHandler { class ConsumerImpl : public virtual Consumer { Channel* parent; diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 6ba2131a74..07b14a4eff 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -26,6 +26,7 @@ #include <MessageStore.h> #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> +#include "AMQFrame.h" using namespace boost; using namespace qpid::broker; diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index 3db5bcd074..dc8bdeda05 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -31,12 +31,12 @@ using namespace qpid::sys; namespace qpid { namespace broker { -Connection::Connection(SessionContext* context_, Broker& broker_) : - context(context_), +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : framemax(65536), heartbeat(0), broker(broker_), - settings(broker.getTimeout(), broker.getStagingThreshold()) + settings(broker.getTimeout(), broker.getStagingThreshold()), + out(out_) {} Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ @@ -68,14 +68,15 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) { // TODO aconway 2007-01-16: correct error code. throw ConnectionException(0, "Connection initiated twice"); client.reset(new qpid::framing::AMQP_ClientProxy( - context, header->getMajor(), header->getMinor())); + out, header->getMajor(), header->getMinor())); FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - // TODO aconway 2007-01-16: Move to adapter. + // TODO aconway 2007-01-16: Client call, move to adapter. client->getConnection().start( - 0, header->getMajor(), header->getMinor(), properties, - mechanisms, locales); + MethodContext(0, out), + header->getMajor(), header->getMinor(), + properties, mechanisms, locales); } void Connection::idleOut(){} @@ -105,7 +106,7 @@ BrokerAdapter& Connection::getAdapter(u_int16_t id) { AdapterMap::iterator i = adapters.find(id); if (i == adapters.end()) { Channel* ch=new Channel( - client->getProtocolVersion(), context, id, + client->getProtocolVersion(), out, id, framemax, broker.getQueues().getStore(), settings.stagingThreshold); BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker); diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index 90346b7c9d..d5d90d4830 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -29,7 +29,7 @@ #include <AMQFrame.h> #include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> -#include <sys/SessionContext.h> +#include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include <sys/TimeoutHandler.h> #include "Broker.h" @@ -50,18 +50,8 @@ class Settings { class Connection : public qpid::sys::ConnectionInputHandler, public ConnectionToken { - typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap; - - // FIXME aconway 2007-01-16: on broker. - typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - Exchange::shared_ptr findExchange(const string& name); - - BrokerAdapter& getAdapter(u_int16_t id); - - AdapterMap adapters; - public: - Connection(qpid::sys::SessionContext* context, Broker& broker); + Connection(qpid::sys::ConnectionOutputHandler* out, Broker& broker); // ConnectionInputHandler methods void received(qpid::framing::AMQFrame* frame); void initiated(qpid::framing::ProtocolInitiation* header); @@ -69,8 +59,9 @@ class Connection : public qpid::sys::ConnectionInputHandler, void idleIn(); void closed(); + qpid::sys::ConnectionOutputHandler& getOutput() { return *out; } + // FIXME aconway 2007-01-16: encapsulate. - qpid::sys::SessionContext* context; u_int32_t framemax; u_int16_t heartbeat; Broker& broker; @@ -91,6 +82,19 @@ class Connection : public qpid::sys::ConnectionInputHandler, Channel& newChannel(u_int16_t channel); Channel& getChannel(u_int16_t channel); void closeChannel(u_int16_t channel); + + private: + typedef boost::ptr_map<u_int16_t, BrokerAdapter> AdapterMap; + + // FIXME aconway 2007-01-16: on broker. + typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + Exchange::shared_ptr findExchange(const string& name); + + BrokerAdapter& getAdapter(u_int16_t id); + + AdapterMap adapters; + qpid::sys::ConnectionOutputHandler* out; + }; }} diff --git a/cpp/lib/broker/ConnectionFactory.cpp b/cpp/lib/broker/ConnectionFactory.cpp index 3c4c7cd724..20485dd0e1 100644 --- a/cpp/lib/broker/ConnectionFactory.cpp +++ b/cpp/lib/broker/ConnectionFactory.cpp @@ -35,9 +35,9 @@ ConnectionFactory::~ConnectionFactory() } qpid::sys::ConnectionInputHandler* -ConnectionFactory::create(qpid::sys::SessionContext* ctxt) +ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out) { - return new Connection(ctxt, broker); + return new Connection(out, broker); } }} // namespace qpid::broker diff --git a/cpp/lib/broker/ConnectionFactory.h b/cpp/lib/broker/ConnectionFactory.h index fe8052ed9c..b699dd0af4 100644 --- a/cpp/lib/broker/ConnectionFactory.h +++ b/cpp/lib/broker/ConnectionFactory.h @@ -32,7 +32,7 @@ class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory public: ConnectionFactory(Broker& b); - virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext* ctxt); + virtual qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler* ctxt); virtual ~ConnectionFactory(); diff --git a/cpp/lib/broker/InMemoryContent.cpp b/cpp/lib/broker/InMemoryContent.cpp index 07af8633e5..e6db6b0539 100644 --- a/cpp/lib/broker/InMemoryContent.cpp +++ b/cpp/lib/broker/InMemoryContent.cpp @@ -19,6 +19,7 @@ * */ #include <InMemoryContent.h> +#include "AMQFrame.h" using namespace qpid::broker; using namespace qpid::framing; diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp index ec1ca3e195..3bf1b374ea 100644 --- a/cpp/lib/broker/LazyLoadedContent.cpp +++ b/cpp/lib/broker/LazyLoadedContent.cpp @@ -19,6 +19,7 @@ * */ #include <LazyLoadedContent.h> +#include "AMQFrame.h" using namespace qpid::broker; using namespace qpid::framing; diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 07ede4097a..33f7a63d45 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -64,7 +64,7 @@ MessageHandlerImpl::close(const MethodContext&, } void -MessageHandlerImpl::consume(const MethodContext&, +MessageHandlerImpl::consume(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& destination, @@ -85,7 +85,7 @@ MessageHandlerImpl::consume(const MethodContext&, string newTag = destination; channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - connection.client->getMessageHandler()->ok(channel.getId()); + connection.client->getMessageHandler()->ok(context); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -102,7 +102,7 @@ MessageHandlerImpl::empty( const MethodContext& ) } void -MessageHandlerImpl::get( const MethodContext&, +MessageHandlerImpl::get( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& /*destination*/, @@ -110,12 +110,12 @@ MessageHandlerImpl::get( const MethodContext&, { assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = + connection.getQueue(queueName, context.channelId); // FIXME: get is probably Basic specific - if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ - - connection.client->getMessageHandler()->empty(channel.getId()); + if(!channel.get(queue, !noAck)){ + connection.client->getMessageHandler()->empty(context); } } @@ -141,7 +141,7 @@ MessageHandlerImpl::open(const MethodContext&, } void -MessageHandlerImpl::qos(const MethodContext&, +MessageHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/ ) @@ -152,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext&, channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getMessageHandler()->ok(channel.getId()); + connection.client->getMessageHandler()->ok(context); } void diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index d97d29153a..0eb9e119f5 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -37,62 +37,62 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) : channel(ch), connection(c), broker(b) {} - void append(const qpid::framing::MethodContext&, + void append(const framing::MethodContext&, const std::string& reference, const std::string& bytes ); - void cancel(const qpid::framing::MethodContext&, + void cancel(const framing::MethodContext&, const std::string& destination ); - void checkpoint(const qpid::framing::MethodContext&, + void checkpoint(const framing::MethodContext&, const std::string& reference, const std::string& identifier ); - void close(const qpid::framing::MethodContext&, + void close(const framing::MethodContext&, const std::string& reference ); - void consume(const qpid::framing::MethodContext&, + void consume(const framing::MethodContext&, u_int16_t ticket, const std::string& queue, const std::string& destination, bool noLocal, bool noAck, bool exclusive, - const qpid::framing::FieldTable& filter ); + const framing::FieldTable& filter ); - void empty( const qpid::framing::MethodContext& ); + void empty( const framing::MethodContext& ); - void get(const qpid::framing::MethodContext&, + void get(const framing::MethodContext&, u_int16_t ticket, const std::string& queue, const std::string& destination, bool noAck ); - void offset(const qpid::framing::MethodContext&, + void offset(const framing::MethodContext&, u_int64_t value ); - void ok( const qpid::framing::MethodContext& ); + void ok( const framing::MethodContext& ); - void open(const qpid::framing::MethodContext&, + void open(const framing::MethodContext&, const std::string& reference ); - void qos(const qpid::framing::MethodContext&, + void qos(const framing::MethodContext&, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global ); - void recover(const qpid::framing::MethodContext&, + void recover(const framing::MethodContext&, bool requeue ); - void reject(const qpid::framing::MethodContext&, + void reject(const framing::MethodContext&, u_int16_t code, const std::string& text ); - void resume(const qpid::framing::MethodContext&, + void resume(const framing::MethodContext&, const std::string& reference, const std::string& identifier ); - void transfer(const qpid::framing::MethodContext&, + void transfer(const framing::MethodContext&, u_int16_t ticket, const std::string& destination, bool redelivered, @@ -113,8 +113,8 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH const std::string& appId, const std::string& transactionId, const std::string& securityToken, - const qpid::framing::FieldTable& applicationHeaders, - qpid::framing::Content body ); + const framing::FieldTable& applicationHeaders, + framing::Content body ); }; }} // namespace qpid::broker diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index af26990d8a..d9edb2f390 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -256,6 +256,16 @@ void Channel::rollback(){ sendAndReceive(frame, method_bodies.tx_rollback_ok); } +void Channel::handleRequest(AMQRequestBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + +void Channel::handleResponse(AMQResponseBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + void Channel::handleMethod(AMQMethodBody::shared_ptr body){ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 5beda0296e..e7bab8b4ee 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -67,7 +67,9 @@ namespace client { * * \ingroup clientapi */ - class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{ + class Channel : private virtual framing::BodyHandler, + public virtual sys::Runnable + { struct Consumer{ MessageListener* listener; int ackMode; @@ -78,36 +80,38 @@ namespace client { u_int16_t id; Connection* con; - qpid::sys::Thread dispatcher; - qpid::framing::OutputHandler* out; + sys::Thread dispatcher; + framing::OutputHandler* out; IncomingMessage* incoming; ResponseHandler responses; std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume IncomingMessage* retrieved;//holds response to basic.get - qpid::sys::Monitor dispatchMonitor; - qpid::sys::Monitor retrievalMonitor; + sys::Monitor dispatchMonitor; + sys::Monitor retrievalMonitor; std::map<std::string, Consumer*> consumers; ReturnedMessageHandler* returnsHandler; bool closed; u_int16_t prefetch; const bool transactional; - qpid::framing::ProtocolVersion version; + framing::ProtocolVersion version; void enqueue(); void retrieve(Message& msg); IncomingMessage* dequeue(); void dispatch(); void stop(); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); + void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body); void deliver(Consumer* consumer, Message& msg); void setQos(); void cancelAll(); - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + virtual void handleMethod(framing::AMQMethodBody::shared_ptr body); + virtual void handleHeader(framing::AMQHeaderBody::shared_ptr body); + virtual void handleContent(framing::AMQContentBody::shared_ptr body); + virtual void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); + void handleRequest(framing::AMQRequestBody::shared_ptr); + void handleResponse(framing::AMQResponseBody::shared_ptr); public: /** @@ -185,7 +189,7 @@ namespace client { * is received from the broker */ void bind(const Exchange& exchange, const Queue& queue, const std::string& key, - const qpid::framing::FieldTable& args, bool synch = true); + const framing::FieldTable& args, bool synch = true); /** * Creates a 'consumer' for a queue. Messages in (or arriving * at) that queue will be delivered to consumers @@ -216,7 +220,7 @@ namespace client { void consume( Queue& queue, std::string& tag, MessageListener* listener, int ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const qpid::framing::FieldTable* fields = 0); + const framing::FieldTable* fields = 0); /** * Cancels a subscription previously set up through a call to consume(). diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index dd1e372095..1ae317db62 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -184,6 +184,16 @@ void Connection::handleFrame(AMQFrame* frame){ } } +void Connection::handleRequest(AMQRequestBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + +void Connection::handleResponse(AMQResponseBody::shared_ptr body) { + // FIXME aconway 2007-01-19: request/response handling. + handleMethod(body); +} + void Connection::handleMethod(AMQMethodBody::shared_ptr body){ //connection.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 21e2fb90a2..9c9b067f88 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -66,7 +66,8 @@ namespace client { class Connection : public virtual qpid::framing::InputHandler, public virtual qpid::sys::TimeoutHandler, public virtual qpid::sys::ShutdownHandler, - private virtual qpid::framing::BodyHandler{ + private virtual qpid::framing::BodyHandler + { typedef std::map<int, Channel*>::iterator iterator; @@ -80,20 +81,25 @@ namespace client { qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; - qpid::framing::ProtocolVersion version; - qpid::framing::Requester requester; - qpid::framing::Responder responder; + framing::ProtocolVersion version; + framing::Requester requester; + framing::Responder responder; - void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); + void channelException(Channel* channel, framing::AMQMethodBody* body, QpidError& e); void error(int code, const std::string& msg, int classid = 0, int methodid = 0); void closeChannel(Channel* channel, u_int16_t code, std::string& text, u_int16_t classId = 0, u_int16_t methodId = 0); - void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body); - - virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body); - virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); - virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); - virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - void handleFrame(qpid::framing::AMQFrame* frame); + void sendAndReceive(framing::AMQFrame* frame, const framing::AMQMethodBody& body); + + // FIXME aconway 2007-01-19: Use channel(0) not connection + // to handle channel 0 requests. Remove handler methods. + // + void handleRequest(framing::AMQRequestBody::shared_ptr); + void handleResponse(framing::AMQResponseBody::shared_ptr); + void handleMethod(framing::AMQMethodBody::shared_ptr); + void handleHeader(framing::AMQHeaderBody::shared_ptr); + void handleContent(framing::AMQContentBody::shared_ptr); + void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr); + void handleFrame(framing::AMQFrame* frame); public: /** @@ -110,7 +116,7 @@ namespace client { * client will accept. Optional and defaults to 65536. */ Connection( bool debug = false, u_int32_t max_frame_size = 65536, - qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion)); + framing::ProtocolVersion* _version = &(framing::highestProtocolVersion)); ~Connection(); /** @@ -163,7 +169,7 @@ namespace client { */ void removeChannel(Channel* channel); - virtual void received(qpid::framing::AMQFrame* frame); + virtual void received(framing::AMQFrame* frame); virtual void idleOut(); virtual void idleIn(); diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index 813c49135e..eefff79d6f 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -65,6 +65,7 @@ libqpidcommon_la_SOURCES = \ $(framing)/AMQMethodBody.cpp \ $(framing)/BasicHeaderProperties.cpp \ $(framing)/BodyHandler.cpp \ + $(framing)/ChannelAdapter.cpp \ $(framing)/Buffer.cpp \ $(framing)/FieldTable.cpp \ $(framing)/FramingContent.cpp \ @@ -96,6 +97,7 @@ nobase_pkginclude_HEADERS = \ $(framing)/AMQMethodBody.h \ $(framing)/BasicHeaderProperties.h \ $(framing)/BodyHandler.h \ + $(framing)/ChannelAdapter.h \ $(framing)/Buffer.h \ $(framing)/FieldTable.h \ $(framing)/FramingContent.h \ @@ -119,7 +121,7 @@ nobase_pkginclude_HEADERS = \ sys/Monitor.h \ sys/Mutex.h \ sys/Runnable.h \ - sys/SessionContext.h \ + sys/ConnectionOutputHandler.h \ sys/ConnectionInputHandler.h \ sys/ConnectionInputHandlerFactory.h \ sys/ShutdownHandler.h \ diff --git a/cpp/lib/common/framing/AMQFrame.cpp b/cpp/lib/common/framing/AMQFrame.cpp index 8ac5199c45..c6837af668 100644 --- a/cpp/lib/common/framing/AMQFrame.cpp +++ b/cpp/lib/common/framing/AMQFrame.cpp @@ -28,15 +28,15 @@ using namespace qpid::framing; AMQP_MethodVersionMap AMQFrame::versionMap; -AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version): +AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version): version(_version) {} -AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) : +AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, AMQBody* _body) : version(_version), channel(_channel), body(_body) {} -AMQFrame::AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : +AMQFrame::AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t _channel, const AMQBody::shared_ptr& _body) : version(_version), channel(_channel), body(_body) {} diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index c27de70e5a..f3c3232d56 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -41,9 +41,9 @@ namespace framing { class AMQFrame : virtual public AMQDataBlock { public: - AMQFrame(qpid::framing::ProtocolVersion& _version = highestProtocolVersion); - AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); - AMQFrame(qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body); + AMQFrame(const qpid::framing::ProtocolVersion& _version = highestProtocolVersion); + AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, AMQBody* body); + AMQFrame(const qpid::framing::ProtocolVersion& _version, u_int16_t channel, const AMQBody::shared_ptr& body); virtual ~AMQFrame(); virtual void encode(Buffer& buffer); virtual bool decode(Buffer& buffer); diff --git a/cpp/lib/common/framing/AMQMethodBody.cpp b/cpp/lib/common/framing/AMQMethodBody.cpp index 0c77a1c64a..73b729b945 100644 --- a/cpp/lib/common/framing/AMQMethodBody.cpp +++ b/cpp/lib/common/framing/AMQMethodBody.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include <AMQFrame.h> #include <AMQMethodBody.h> #include <QpidError.h> #include "AMQP_MethodVersionMap.h" @@ -59,5 +60,8 @@ void AMQMethodBody::decode(Buffer& buffer, u_int32_t /*size*/) { decodeContent(buffer); } +void AMQMethodBody::send(const MethodContext& context) { + context.out->send(new AMQFrame(version, context.channelId, this)); +} }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQMethodBody.h b/cpp/lib/common/framing/AMQMethodBody.h index 9f859046f8..ff09ee60e1 100644 --- a/cpp/lib/common/framing/AMQMethodBody.h +++ b/cpp/lib/common/framing/AMQMethodBody.h @@ -53,6 +53,13 @@ class AMQMethodBody : public AMQBody virtual void invoke(AMQP_ServerOperations&, const MethodContext&); bool match(AMQMethodBody* other) const; + + /** + * Wrap this method in a frame and send using the current context. + * Note the frame takes ownership of the body, it will be deleted. + */ + virtual void send(const MethodContext& context); + protected: static u_int32_t baseSize() { return 4; } diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp index c64b1325d6..dffbb62aca 100644 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -16,6 +16,7 @@ * */ +#include "AMQFrame.h" #include "AMQResponseBody.h" #include "AMQP_MethodVersionMap.h" @@ -61,5 +62,11 @@ void AMQResponseBody::printPrefix(std::ostream& out) const { << ",batch=" << data.batchOffset << "): "; } +void AMQResponseBody::send(const MethodContext& context) { + setRequestId(context.requestId); + assert(context.out); + context.out->send( + new AMQFrame(version, context.channelId, this)); +} }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index 6528613a12..2520a481f2 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -65,6 +65,11 @@ class AMQResponseBody : public AMQMethodBody ResponseId getResponseId() { return data.responseId; } RequestId getRequestId() { return data.requestId; } BatchOffset getBatchOffset() { return data.batchOffset; } + void setResponseId(ResponseId id) { data.responseId = id; } + void setRequestId(RequestId id) { data.requestId = id; } + void setBatchOffset(BatchOffset id) { data.batchOffset = id; } + + virtual void send(const MethodContext& context); protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } diff --git a/cpp/lib/common/framing/BodyHandler.cpp b/cpp/lib/common/framing/BodyHandler.cpp index 72ba82468f..5dd0c0c23d 100644 --- a/cpp/lib/common/framing/BodyHandler.cpp +++ b/cpp/lib/common/framing/BodyHandler.cpp @@ -58,22 +58,3 @@ void BodyHandler::handleBody(shared_ptr<AMQBody> body) { } } -void BodyHandler::handleRequest(AMQRequestBody::shared_ptr request) { - responder.received(request->getData()); - handleMethod(request); -} - -void BodyHandler::handleResponse(AMQResponseBody::shared_ptr response) { - handleMethod(response); - requester.processed(response->getData()); -} - -void BodyHandler::assertChannelZero(u_int16_t id) { - if (id != 0) - throw ConnectionException(504, "Invalid channel id, not 0"); -} - -void BodyHandler::assertChannelNonZero(u_int16_t id) { - if (id == 0) - throw ConnectionException(504, "Invalid channel id 0"); -} diff --git a/cpp/lib/common/framing/BodyHandler.h b/cpp/lib/common/framing/BodyHandler.h index c9c74e2b3f..cb3f0997b0 100644 --- a/cpp/lib/common/framing/BodyHandler.h +++ b/cpp/lib/common/framing/BodyHandler.h @@ -38,35 +38,21 @@ class AMQContentBody; class AMQHeartbeatBody; /** - * Base class for client and broker channel handlers. - * - * Handles request/response id management common to client and broker. - * Derived classes provide remaining client/broker specific handling. + * Interface to handle incoming frame bodies. + * Derived classes provide logic for each frame type. */ class BodyHandler { public: virtual ~BodyHandler(); - - void handleBody(boost::shared_ptr<AMQBody> body); + virtual void handleBody(boost::shared_ptr<AMQBody> body); protected: - virtual void handleRequest(boost::shared_ptr<AMQRequestBody>); - virtual void handleResponse(boost::shared_ptr<AMQResponseBody>); - + virtual void handleRequest(boost::shared_ptr<AMQRequestBody>) = 0; + virtual void handleResponse(boost::shared_ptr<AMQResponseBody>) = 0; virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0; virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0; virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0; virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0; - - protected: - /** Throw protocol exception if this is not channel 0. */ - static void assertChannelZero(u_int16_t id); - /** Throw protocol exception if this is channel 0. */ - static void assertChannelNonZero(u_int16_t id); - - private: - Requester requester; - Responder responder; }; }} diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp new file mode 100644 index 0000000000..cf6fea1455 --- /dev/null +++ b/cpp/lib/common/framing/ChannelAdapter.cpp @@ -0,0 +1,70 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "ChannelAdapter.h" +#include "AMQFrame.h" + +namespace qpid { +namespace framing { + +void ChannelAdapter::send(AMQFrame* frame) { + AMQBody::shared_ptr body = frame->getBody(); + switch (body->type()) { + case REQUEST_BODY: { + AMQRequestBody::shared_ptr request = + boost::shared_polymorphic_downcast<AMQRequestBody>(body); + requester.sending(request->getData()); + break; + } + case RESPONSE_BODY: { + AMQResponseBody::shared_ptr response = + boost::shared_polymorphic_downcast<AMQResponseBody>(body); + responder.sending(response->getData()); + break; + } + } + out.send(frame); +} + +void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { + responder.received(request->getData()); + MethodContext context(id, &out, request->getRequestId()); + handleMethodInContext(request, context); +} + +void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { + handleMethod(response); + requester.processed(response->getData()); +} + +void ChannelAdapter::handleMethod(AMQMethodBody::shared_ptr method) { + MethodContext context(id, this); + handleMethodInContext(method, context); +} + +void ChannelAdapter::assertChannelZero(u_int16_t id) { + if (id != 0) + throw ConnectionException(504, "Invalid channel id, not 0"); +} + +void ChannelAdapter::assertChannelNonZero(u_int16_t id) { + if (id == 0) + throw ConnectionException(504, "Invalid channel id 0"); +} + +}} // namespace qpid::framing diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h new file mode 100644 index 0000000000..0652cc41bb --- /dev/null +++ b/cpp/lib/common/framing/ChannelAdapter.h @@ -0,0 +1,90 @@ +#ifndef _ChannelAdapter_ +#define _ChannelAdapter_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> + +#include "BodyHandler.h" +#include "Requester.h" +#include "Responder.h" +#include "OutputHandler.h" + +namespace qpid { +namespace framing { + +class MethodContext; + +/** + * Base class for client and broker channel adapters. + * + * As BodyHandler: + * - Creates MethodContext and dispatches methods+context to derived class. + * - Updates request/response ID data. + * + * As OutputHandler: + * - Updates request/resposne ID data. + * + */ +class ChannelAdapter : public BodyHandler, public OutputHandler { + public: + /** + *@param output Processed frames are forwarded to this handler. + */ + ChannelAdapter(OutputHandler& output, ChannelId channelId) + : id(channelId), out(output) {} + + ChannelId getId() { return id; } + + /** + * Do request/response-id processing and then forward to + * handler provided to constructor. Response frames should + * have their request-id set before calling send. + */ + void send(AMQFrame* frame); + + void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); + void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); + void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>); + + protected: + /** Throw protocol exception if this is not channel 0. */ + static void assertChannelZero(u_int16_t id); + /** Throw protocol exception if this is channel 0. */ + static void assertChannelNonZero(u_int16_t id); + + virtual void handleMethodInContext( + boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const MethodContext& context) = 0; + + ChannelId id; + + private: + Requester requester; + Responder responder; + OutputHandler& out; +}; + +}} + + +#endif diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 13d5f658ca..1aa4be8f1e 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -19,6 +19,9 @@ * */ +#include "OutputHandler.h" +#include "ProtocolVersion.h" + namespace qpid { namespace framing { @@ -26,11 +29,14 @@ class BodyHandler; /** * Invocation context for an AMQP method. + * Some of the context information is related to the channel, some + * to the specific invocation - e.g. requestId. + * * All generated proxy and handler functions take a MethodContext parameter. * - * The user calling on a broker proxy can simply pass an integer - * channel ID, it will implicitly be converted to an appropriate context. - * + * The user does not need to create MethodContext objects explicitly, + * the constructor will implicitly create one from a channel ID. + * * Other context members are for internal use. */ struct MethodContext @@ -39,13 +45,21 @@ struct MethodContext * Passing a integer channel-id in place of a MethodContext * will automatically construct the MethodContext. */ - MethodContext(ChannelId channel, RequestId request=0) - : channelId(channel), requestId(request) {} + MethodContext( + ChannelId channel, OutputHandler* output=0, RequestId request=0) + : channelId(channel), out(output), requestId(request){} + + /** \internal Channel on which the method is sent. */ + const ChannelId channelId; + + /** Output handler for responses in this context */ + OutputHandler* out; + + /** \internal If we are in the context of processing an incoming request, + * this is the ID. Otherwise it is 0. + */ + const RequestId requestId; - /** Channel on which the method is sent. */ - ChannelId channelId; - /** \internal For proxy response: the original request or 0. */ - RequestId requestId; }; }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/OutputHandler.h b/cpp/lib/common/framing/OutputHandler.h index 2e01e34df2..9ffd4227d8 100644 --- a/cpp/lib/common/framing/OutputHandler.h +++ b/cpp/lib/common/framing/OutputHandler.h @@ -22,10 +22,10 @@ * */ #include <boost/noncopyable.hpp> -#include <AMQFrame.h> namespace qpid { namespace framing { +class AMQFrame; class OutputHandler : private boost::noncopyable { public: diff --git a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h index 5bb5e17704..af7d411928 100644 --- a/cpp/lib/common/sys/ConnectionInputHandlerFactory.h +++ b/cpp/lib/common/sys/ConnectionInputHandlerFactory.h @@ -26,7 +26,7 @@ namespace qpid { namespace sys { -class SessionContext; +class ConnectionOutputHandler; class ConnectionInputHandler; /** @@ -36,7 +36,7 @@ class ConnectionInputHandler; class ConnectionInputHandlerFactory : private boost::noncopyable { public: - virtual ConnectionInputHandler* create(SessionContext* ctxt) = 0; + virtual ConnectionInputHandler* create(ConnectionOutputHandler* ctxt) = 0; virtual ~ConnectionInputHandlerFactory(){} }; diff --git a/cpp/lib/common/sys/SessionContext.h b/cpp/lib/common/sys/ConnectionOutputHandler.h index 671e00774f..91849e1dfb 100644 --- a/cpp/lib/common/sys/SessionContext.h +++ b/cpp/lib/common/sys/ConnectionOutputHandler.h @@ -18,8 +18,8 @@ * under the License. * */ -#ifndef _SessionContext_ -#define _SessionContext_ +#ifndef _ConnectionOutputHandler_ +#define _ConnectionOutputHandler_ #include <OutputHandler.h> @@ -29,7 +29,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class SessionContext : public virtual qpid::framing::OutputHandler +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler { public: virtual void close() = 0; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index 8cf50b87ba..81cfc0efda 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -30,7 +30,7 @@ #include <AMQFrame.h> #include <Buffer.h> #include <sys/Monitor.h> -#include <sys/SessionContext.h> +#include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include "APRSocket.h" @@ -40,7 +40,7 @@ namespace qpid { namespace sys { -class LFSessionContext : public virtual qpid::sys::SessionContext +class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler { const bool debug; APRSocket socket; diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp index 787d12d6d1..548fbd1881 100644 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -26,7 +26,7 @@ #include <boost/bind.hpp> #include <boost/scoped_ptr.hpp> -#include <sys/SessionContext.h> +#include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include <sys/ConnectionInputHandlerFactory.h> #include <sys/Acceptor.h> diff --git a/cpp/lib/common/sys/posix/EventChannelConnection.h b/cpp/lib/common/sys/posix/EventChannelConnection.h index 1504e92651..da7b6dca27 100644 --- a/cpp/lib/common/sys/posix/EventChannelConnection.h +++ b/cpp/lib/common/sys/posix/EventChannelConnection.h @@ -23,7 +23,7 @@ #include "EventChannelThreads.h" #include "sys/Monitor.h" -#include "sys/SessionContext.h" +#include "sys/ConnectionOutputHandler.h" #include "sys/ConnectionInputHandler.h" #include "sys/AtomicCount.h" #include "framing/AMQFrame.h" @@ -34,13 +34,13 @@ namespace sys { class ConnectionInputHandlerFactory; /** - * Implements SessionContext and delegates to a ConnectionInputHandler + * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler * for a connection via the EventChannel. *@param readDescriptor file descriptor for reading. *@param writeDescriptor file descriptor for writing, * by default same as readDescriptor */ -class EventChannelConnection : public SessionContext { +class EventChannelConnection : public ConnectionOutputHandler { public: EventChannelConnection( EventChannelThreads::shared_ptr threads, @@ -50,7 +50,7 @@ class EventChannelConnection : public SessionContext { bool isTrace = false ); - // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr + // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr virtual void send(qpid::framing::AMQFrame* frame) { send(std::auto_ptr<qpid::framing::AMQFrame>(frame)); } diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h index 5afbe8f5a8..57b5a5757c 100644 --- a/cpp/lib/common/sys/posix/check.h +++ b/cpp/lib/common/sys/posix/check.h @@ -45,7 +45,7 @@ class PosixError : public qpid::QpidError Exception* clone() const throw() { return new PosixError(*this); } - void throwSelf() { throw *this; } + void throwSelf() const { throw *this; } private: int errNo; diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index e7a02d631d..b31ff6a321 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -27,7 +27,7 @@ #include <iostream> #include <memory> #include <AMQP_HighestVersion.h> - +#include "AMQFrame.h" using namespace boost; using namespace qpid::broker; @@ -107,6 +107,9 @@ class ChannelTest : public CppUnit::TestCase handle(call); } + // Don't hide overloads. + using NullMessageStore::destroy; + void destroy(Message* msg) { MethodCall call = {"destroy", msg, ""}; diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp index bd638dae66..1494518578 100644 --- a/cpp/tests/InMemoryContentTest.cpp +++ b/cpp/tests/InMemoryContentTest.cpp @@ -23,6 +23,7 @@ #include <AMQP_HighestVersion.h> #include <iostream> #include <list> +#include "AMQFrame.h" using std::list; using std::string; diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index 2075a6dd3a..624d2be3ff 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -25,6 +25,7 @@ #include <iostream> #include <list> #include <sstream> +#include "AMQFrame.h" using std::list; using std::string; diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index 88e8318832..21f5935218 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -71,6 +71,9 @@ class MessageBuilderTest : public CppUnit::TestCase } } + // Don't hide overloads. + using NullMessageStore::destroy; + void destroy(Message* msg) { CPPUNIT_ASSERT(msg->getPersistenceId()); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index bcf3ad8064..8bb570e598 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -22,6 +22,7 @@ #include <qpid_test_plugin.h> #include <iostream> #include <AMQP_HighestVersion.h> +#include "AMQFrame.h" using namespace boost; using namespace qpid::broker; diff --git a/cpp/tests/MockConnectionInputHandler.h b/cpp/tests/MockConnectionInputHandler.h index 522df0900f..b039e244d9 100644 --- a/cpp/tests/MockConnectionInputHandler.h +++ b/cpp/tests/MockConnectionInputHandler.h @@ -89,7 +89,7 @@ struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler { struct MockConnectionInputHandlerFactory : public qpid::sys::ConnectionInputHandlerFactory { MockConnectionInputHandlerFactory() : handler(0) {} - qpid::sys::ConnectionInputHandler* create(qpid::sys::SessionContext*) { + qpid::sys::ConnectionInputHandler* create(qpid::sys::ConnectionOutputHandler*) { qpid::sys::Monitor::ScopedLock lock(monitor); handler = new MockConnectionInputHandler(); monitor.notifyAll(); |