diff options
-rw-r--r-- | cpp/broker/inc/SessionHandlerImpl.h | 2 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 39 | ||||
-rw-r--r-- | cpp/common/framing/generated/stylesheets/amqp_client.xsl | 26 | ||||
-rw-r--r-- | cpp/common/framing/generated/stylesheets/amqp_server.xsl | 80 |
4 files changed, 89 insertions, 58 deletions
diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h index 14a6404c78..167bf0cc23 100644 --- a/cpp/broker/inc/SessionHandlerImpl.h +++ b/cpp/broker/inc/SessionHandlerImpl.h @@ -23,6 +23,7 @@ #include <vector> #include <exception> #include "AMQFrame.h" +#include "AMQP_ClientProxy.h" #include "AMQP_ServerOperations.h" #include "AutoDelete.h" #include "ExchangeRegistry.h" @@ -64,6 +65,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler, typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; qpid::io::SessionContext* context; + qpid::framing::AMQP_ClientProxy client; QueueRegistry* queues; ExchangeRegistry* const exchanges; AutoDelete* const cleaner; diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 19e243a01b..a75b8fcf0f 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -40,6 +40,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, basicHandler(new BasicHandlerImpl(this)), exchangeHandler(new ExchangeHandlerImpl(this)), queueHandler(new QueueHandlerImpl(this)), + client(context), framemax(65536), heartbeat(0){ @@ -90,9 +91,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ }catch(ChannelException& e){ channels[channel]->close(); channels.erase(channel); - context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); }catch(ConnectionException& e){ - context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId()))); + client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); } break; @@ -116,7 +117,7 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales))); + client.getConnection().start(0, 8, 0, properties, mechanisms, locales); } void SessionHandlerImpl::idleOut(){ @@ -156,7 +157,7 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism, string& response, string& locale){ - parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat))); + parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); } void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){} @@ -168,13 +169,13 @@ void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int1 void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){ string knownhosts; - parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts))); + parent->client.getConnection().openOk(0, knownhosts); } void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, u_int16_t methodId){ - parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody())); + parent->client.getConnection().closeOk(0); parent->context->close(); } @@ -186,7 +187,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); - parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody())); + parent->client.getChannel().openOk(channel); } void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){} @@ -198,7 +199,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t parent->channels.erase(channel); c->close(); delete c; - parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody())); + parent->client.getChannel().closeOk(channel); } void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} @@ -230,7 +231,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 } parent->exchanges->getLock()->release(); if(!nowait){ - parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody())); + parent->client.getExchange().declareOk(channel); } } @@ -239,11 +240,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16 parent->exchanges->getLock()->acquire(); parent->exchanges->destroy(exchange); parent->exchanges->getLock()->release(); - if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody())); + if(!nowait) parent->client.getExchange().deleteOk(channel); } - - - void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name, bool passive, bool durable, bool exclusive, @@ -271,8 +269,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t } if(!nowait){ name = queue->getName(); - QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount()); - parent->context->send(new AMQFrame(channel, response)); + parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); } } @@ -285,7 +282,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t tic if(exchange){ if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName(); exchange->bind(queue, routingKey, &arguments); - if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody())); + if(!nowait) parent->client.getQueue().bindOk(channel); }else{ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); } @@ -295,7 +292,7 @@ void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ti Queue::shared_ptr queue = parent->getQueue(queueName, channel); int count = queue->purge(); - if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count))); + if(!nowait) parent->client.getQueue().purgeOk(channel, count); } void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue, @@ -316,7 +313,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t count = q->getMessageCount(); parent->queues->destroy(queue); } - if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count))); + if(!nowait) parent->client.getQueue().deleteOk(channel, count); } @@ -327,7 +324,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref //TODO: channel doesn't do anything with these qos parameters yet parent->channels[channel]->setPrefetchSize(prefetchSize); parent->channels[channel]->setPrefetchCount(prefetchCount); - parent->context->send(new AMQFrame(channel, new BasicQosOkBody())); + parent->client.getBasic().qosOk(channel); } void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket, @@ -344,7 +341,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ try{ channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); + if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); @@ -357,7 +354,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ parent->channels[channel]->cancel(consumerTag); - if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag))); + if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); } void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket, diff --git a/cpp/common/framing/generated/stylesheets/amqp_client.xsl b/cpp/common/framing/generated/stylesheets/amqp_client.xsl index f0fa3d7890..13a912a926 100644 --- a/cpp/common/framing/generated/stylesheets/amqp_client.xsl +++ b/cpp/common/framing/generated/stylesheets/amqp_client.xsl @@ -11,11 +11,11 @@ --> <xsl:template match="amqp" mode="client_h"> <xsl:param name="domain-cpp-table"/> - <xsl:result-document href="AMQP_Client.h" format="textFormat"> + <xsl:result-document href="AMQP_ServerProxy.h" format="textFormat"> <xsl:value-of select="amqp:copyright()"/> <xsl:text> -#ifndef _AMQP_Client_ -#define _AMQP_Client_ +#ifndef _AMQP_ServerProxy_ +#define _AMQP_ServerProxy_ #include "AMQP_ServerOperations.h" #include "FieldTable.h" @@ -24,13 +24,13 @@ namespace qpid { namespace framing { -class AMQP_Client : virtual public AMQP_ServerOperations +class AMQP_ServerProxy : virtual public AMQP_ServerOperations { OutputHandler* out; public: - AMQP_Client(OutputHandler* _out); - virtual ~AMQP_Client() {}

</xsl:text> + AMQP_ServerProxy(OutputHandler* _out); + virtual ~AMQP_ServerProxy() {}

</xsl:text> <xsl:for-each select="class"> <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> <xsl:if test="doc"> @@ -76,7 +76,7 @@ class AMQP_Client : virtual public AMQP_ServerOperations </xsl:for-each> <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */
</xsl:text> </xsl:for-each> - <xsl:text>}; /* class AMQP_Client */ + <xsl:text>}; /* class AMQP_ServerProxy */ } /* namespace framing */ } /* namespace qpid */ @@ -94,16 +94,16 @@ class AMQP_Client : virtual public AMQP_ServerOperations --> <xsl:template match="amqp" mode="client_cpp"> <xsl:param name="domain-cpp-table"/> - <xsl:result-document href="AMQP_Client.cpp" format="textFormat"> + <xsl:result-document href="AMQP_ServerProxy.cpp" format="textFormat"> <xsl:value-of select="amqp:copyright()"/> <xsl:text> -#include "AMQP_Client.h" +#include "AMQP_ServerProxy.h" namespace qpid { namespace framing { -AMQP_Client::AMQP_Client(OutputHandler* _out) : +AMQP_ServerProxy::AMQP_ServerProxy(OutputHandler* _out) : out(_out) { }

</xsl:text> @@ -111,15 +111,15 @@ AMQP_Client::AMQP_Client(OutputHandler* _out) : <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> <xsl:text>
/* ++++++++++ Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> ++++++++++ */ -AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) : +AMQP_ServerProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) : out(_out) { } -AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}

</xsl:text> +AMQP_ServerProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}

</xsl:text> <xsl:for-each select="method"> <xsl:if test="chassis[@name='server']"> - <xsl:text>void AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text> + <xsl:text>void AMQP_ServerProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text> <xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field"> <xsl:text>,
 </xsl:text> <xsl:for-each select="field"> diff --git a/cpp/common/framing/generated/stylesheets/amqp_server.xsl b/cpp/common/framing/generated/stylesheets/amqp_server.xsl index 4ad29a4b95..5ff8994888 100644 --- a/cpp/common/framing/generated/stylesheets/amqp_server.xsl +++ b/cpp/common/framing/generated/stylesheets/amqp_server.xsl @@ -11,11 +11,10 @@ --> <xsl:template match="amqp" mode="server_h"> <xsl:param name="domain-cpp-table"/> - <xsl:result-document href="AMQP_Server.h" format="textFormat"> + <xsl:result-document href="AMQP_ClientProxy.h" format="textFormat"> <xsl:value-of select="amqp:copyright()"/> - <xsl:text> -#ifndef _AMQP_Server_ -#define _AMQP_Server_ +#ifndef _AMQP_ClientProxy_ +#define _AMQP_ClientProxy_ #include "AMQP_ClientOperations.h" #include "FieldTable.h" @@ -24,14 +23,15 @@ namespace qpid { namespace framing { -class AMQP_Server : virtual public AMQP_ClientOperations +class AMQP_ClientProxy : virtual public AMQP_ClientOperations { - OutputHandler* out; - public: - AMQP_Server(OutputHandler* _out); - virtual ~AMQP_Server() {}

</xsl:text> - <xsl:for-each select="class"> + + AMQP_ClientProxy(OutputHandler* _out); + virtual ~AMQP_ClientProxy() {}; + + <!-- inner classes --> + <xsl:for-each select="class"> <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> <xsl:if test="doc"> <xsl:text>
/**
===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> =====
</xsl:text> @@ -74,15 +74,32 @@ class AMQP_Server : virtual public AMQP_ClientOperations <xsl:text> );
</xsl:text> </xsl:if> </xsl:for-each> - <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */
</xsl:text> + <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/> */ </xsl:for-each> - <xsl:text>}; /* class AMQP_Server */ + + <!-- Accessors for each nested class instance --> + <xsl:for-each select="class"> + <xsl:value-of select="concat(amqp:cpp-class-name(@name), '& get', amqp:cpp-class-name(@name), '()')"/>; + </xsl:for-each> + + private: + + OutputHandler* out; + + <!-- An instance of each nested class --> + <xsl:for-each select="class"> + <xsl:value-of select="concat(amqp:cpp-class-name(@name), ' ', amqp:cpp-name(@name))"/>; + </xsl:for-each> + + + + }; /* class AMQP_ClientProxy */ } /* namespace framing */ } /* namespace qpid */ -#endif
</xsl:text> - </xsl:result-document> +#endif +</xsl:result-document> </xsl:template> @@ -94,32 +111,39 @@ class AMQP_Server : virtual public AMQP_ClientOperations --> <xsl:template match="amqp" mode="server_cpp"> <xsl:param name="domain-cpp-table"/> - <xsl:result-document href="AMQP_Server.cpp" format="textFormat"> + <xsl:result-document href="AMQP_ClientProxy.cpp" format="textFormat"> <xsl:value-of select="amqp:copyright()"/> - <xsl:text> -#include "AMQP_Server.h" +#include "AMQP_ClientProxy.h" namespace qpid { namespace framing { -AMQP_Server::AMQP_Server(OutputHandler* _out) : - out(_out) +AMQP_ClientProxy::AMQP_ClientProxy(OutputHandler* _out) : + out(_out), + <!-- Initialisation of each nested class instance --> + <xsl:for-each select="class"> + <xsl:value-of select="concat(amqp:cpp-name(@name), '(_out)')"/> + <xsl:if test="position()!=last()">, + </xsl:if> + </xsl:for-each> + { -}

</xsl:text> +} + <xsl:for-each select="class"> <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> <xsl:text>
/* ++++++++++ Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> ++++++++++ */ -AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) : +AMQP_ClientProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) : out(_out) { } -AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}

</xsl:text> +AMQP_ClientProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}

</xsl:text> <xsl:for-each select="method"> <xsl:if test="chassis[@name='client']"> - <xsl:text>void AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text> + <xsl:text>void AMQP_ClientProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text> <xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field"> <xsl:text>,
 </xsl:text> <xsl:for-each select="field"> @@ -145,8 +169,16 @@ AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text>< </xsl:if> </xsl:for-each> </xsl:for-each> - <xsl:text> + <!-- Accessors for each nested class instance --> + <xsl:for-each select="class"> + <xsl:value-of select="concat('AMQP_ClientProxy::', amqp:cpp-class-name(@name), '& AMQP_ClientProxy::get', amqp:cpp-class-name(@name), '()')"/>{ + <xsl:value-of select="concat(' return ', amqp:cpp-name(@name))"/>; + } + + </xsl:for-each> + + <xsl:text> } /* namespace framing */ } /* namespace qpid */
</xsl:text> </xsl:result-document> |