summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/broker/inc/SessionHandlerImpl.h2
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp39
-rw-r--r--cpp/common/framing/generated/stylesheets/amqp_client.xsl26
-rw-r--r--cpp/common/framing/generated/stylesheets/amqp_server.xsl80
4 files changed, 89 insertions, 58 deletions
diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h
index 14a6404c78..167bf0cc23 100644
--- a/cpp/broker/inc/SessionHandlerImpl.h
+++ b/cpp/broker/inc/SessionHandlerImpl.h
@@ -23,6 +23,7 @@
#include <vector>
#include <exception>
#include "AMQFrame.h"
+#include "AMQP_ClientProxy.h"
#include "AMQP_ServerOperations.h"
#include "AutoDelete.h"
#include "ExchangeRegistry.h"
@@ -64,6 +65,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
qpid::io::SessionContext* context;
+ qpid::framing::AMQP_ClientProxy client;
QueueRegistry* queues;
ExchangeRegistry* const exchanges;
AutoDelete* const cleaner;
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index 19e243a01b..a75b8fcf0f 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -40,6 +40,7 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
basicHandler(new BasicHandlerImpl(this)),
exchangeHandler(new ExchangeHandlerImpl(this)),
queueHandler(new QueueHandlerImpl(this)),
+ client(context),
framemax(65536),
heartbeat(0){
@@ -90,9 +91,9 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
}catch(ChannelException& e){
channels[channel]->close();
channels.erase(channel);
- context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}catch(ConnectionException& e){
- context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
}
break;
@@ -116,7 +117,7 @@ void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales)));
+ client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
}
void SessionHandlerImpl::idleOut(){
@@ -156,7 +157,7 @@ void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism,
string& response, string& locale){
- parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat)));
+ parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
}
void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){}
@@ -168,13 +169,13 @@ void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int1
void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){
string knownhosts;
- parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts)));
+ parent->client.getConnection().openOk(0, knownhosts);
}
void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText,
u_int16_t classId, u_int16_t methodId){
- parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody()));
+ parent->client.getConnection().closeOk(0);
parent->context->close();
}
@@ -186,7 +187,7 @@ void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){
void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){
parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
- parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody()));
+ parent->client.getChannel().openOk(channel);
}
void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){}
@@ -198,7 +199,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t
parent->channels.erase(channel);
c->close();
delete c;
- parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody()));
+ parent->client.getChannel().closeOk(channel);
}
void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){}
@@ -230,7 +231,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16
}
parent->exchanges->getLock()->release();
if(!nowait){
- parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody()));
+ parent->client.getExchange().declareOk(channel);
}
}
@@ -239,11 +240,8 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16
parent->exchanges->getLock()->acquire();
parent->exchanges->destroy(exchange);
parent->exchanges->getLock()->release();
- if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody()));
+ if(!nowait) parent->client.getExchange().deleteOk(channel);
}
-
-
-
void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name,
bool passive, bool durable, bool exclusive,
@@ -271,8 +269,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
}
if(!nowait){
name = queue->getName();
- QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount());
- parent->context->send(new AMQFrame(channel, response));
+ parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
}
}
@@ -285,7 +282,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t tic
if(exchange){
if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName();
exchange->bind(queue, routingKey, &arguments);
- if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody()));
+ if(!nowait) parent->client.getQueue().bindOk(channel);
}else{
throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
}
@@ -295,7 +292,7 @@ void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ti
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
int count = queue->purge();
- if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count)));
+ if(!nowait) parent->client.getQueue().purgeOk(channel, count);
}
void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue,
@@ -316,7 +313,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
count = q->getMessageCount();
parent->queues->destroy(queue);
}
- if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count)));
+ if(!nowait) parent->client.getQueue().deleteOk(channel, count);
}
@@ -327,7 +324,7 @@ void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t pref
//TODO: channel doesn't do anything with these qos parameters yet
parent->channels[channel]->setPrefetchSize(prefetchSize);
parent->channels[channel]->setPrefetchCount(prefetchCount);
- parent->context->send(new AMQFrame(channel, new BasicQosOkBody()));
+ parent->client.getBasic().qosOk(channel);
}
void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket,
@@ -344,7 +341,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
try{
channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
- if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
+ if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
@@ -357,7 +354,7 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
parent->channels[channel]->cancel(consumerTag);
- if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag)));
+ if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
}
void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket,
diff --git a/cpp/common/framing/generated/stylesheets/amqp_client.xsl b/cpp/common/framing/generated/stylesheets/amqp_client.xsl
index f0fa3d7890..13a912a926 100644
--- a/cpp/common/framing/generated/stylesheets/amqp_client.xsl
+++ b/cpp/common/framing/generated/stylesheets/amqp_client.xsl
@@ -11,11 +11,11 @@
-->
<xsl:template match="amqp" mode="client_h">
<xsl:param name="domain-cpp-table"/>
- <xsl:result-document href="AMQP_Client.h" format="textFormat">
+ <xsl:result-document href="AMQP_ServerProxy.h" format="textFormat">
<xsl:value-of select="amqp:copyright()"/>
<xsl:text>
-#ifndef _AMQP_Client_
-#define _AMQP_Client_
+#ifndef _AMQP_ServerProxy_
+#define _AMQP_ServerProxy_
#include "AMQP_ServerOperations.h"
#include "FieldTable.h"
@@ -24,13 +24,13 @@
namespace qpid {
namespace framing {
-class AMQP_Client : virtual public AMQP_ServerOperations
+class AMQP_ServerProxy : virtual public AMQP_ServerOperations
{
OutputHandler* out;
public:
- AMQP_Client(OutputHandler* _out);
- virtual ~AMQP_Client() {}&#xA;&#xA;</xsl:text>
+ AMQP_ServerProxy(OutputHandler* _out);
+ virtual ~AMQP_ServerProxy() {}&#xA;&#xA;</xsl:text>
<xsl:for-each select="class">
<xsl:variable name="class" select="amqp:cpp-class-name(@name)"/>
<xsl:if test="doc">
@@ -76,7 +76,7 @@ class AMQP_Client : virtual public AMQP_ServerOperations
</xsl:for-each>
<xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */&#xA;</xsl:text>
</xsl:for-each>
- <xsl:text>}; /* class AMQP_Client */
+ <xsl:text>}; /* class AMQP_ServerProxy */
} /* namespace framing */
} /* namespace qpid */
@@ -94,16 +94,16 @@ class AMQP_Client : virtual public AMQP_ServerOperations
-->
<xsl:template match="amqp" mode="client_cpp">
<xsl:param name="domain-cpp-table"/>
- <xsl:result-document href="AMQP_Client.cpp" format="textFormat">
+ <xsl:result-document href="AMQP_ServerProxy.cpp" format="textFormat">
<xsl:value-of select="amqp:copyright()"/>
<xsl:text>
-#include "AMQP_Client.h"
+#include "AMQP_ServerProxy.h"
namespace qpid {
namespace framing {
-AMQP_Client::AMQP_Client(OutputHandler* _out) :
+AMQP_ServerProxy::AMQP_ServerProxy(OutputHandler* _out) :
out(_out)
{
}&#xA;&#xA;</xsl:text>
@@ -111,15 +111,15 @@ AMQP_Client::AMQP_Client(OutputHandler* _out) :
<xsl:variable name="class" select="amqp:cpp-class-name(@name)"/>
<xsl:text>&#xA;/* ++++++++++ Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> ++++++++++ */
-AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) :
+AMQP_ServerProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) :
out(_out)
{
}
-AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}&#xA;&#xA;</xsl:text>
+AMQP_ServerProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}&#xA;&#xA;</xsl:text>
<xsl:for-each select="method">
<xsl:if test="chassis[@name='server']">
- <xsl:text>void AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text>
+ <xsl:text>void AMQP_ServerProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text>
<xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field">
<xsl:text>,&#xA; </xsl:text>
<xsl:for-each select="field">
diff --git a/cpp/common/framing/generated/stylesheets/amqp_server.xsl b/cpp/common/framing/generated/stylesheets/amqp_server.xsl
index 4ad29a4b95..5ff8994888 100644
--- a/cpp/common/framing/generated/stylesheets/amqp_server.xsl
+++ b/cpp/common/framing/generated/stylesheets/amqp_server.xsl
@@ -11,11 +11,10 @@
-->
<xsl:template match="amqp" mode="server_h">
<xsl:param name="domain-cpp-table"/>
- <xsl:result-document href="AMQP_Server.h" format="textFormat">
+ <xsl:result-document href="AMQP_ClientProxy.h" format="textFormat">
<xsl:value-of select="amqp:copyright()"/>
- <xsl:text>
-#ifndef _AMQP_Server_
-#define _AMQP_Server_
+#ifndef _AMQP_ClientProxy_
+#define _AMQP_ClientProxy_
#include "AMQP_ClientOperations.h"
#include "FieldTable.h"
@@ -24,14 +23,15 @@
namespace qpid {
namespace framing {
-class AMQP_Server : virtual public AMQP_ClientOperations
+class AMQP_ClientProxy : virtual public AMQP_ClientOperations
{
- OutputHandler* out;
-
public:
- AMQP_Server(OutputHandler* _out);
- virtual ~AMQP_Server() {}&#xA;&#xA;</xsl:text>
- <xsl:for-each select="class">
+
+ AMQP_ClientProxy(OutputHandler* _out);
+ virtual ~AMQP_ClientProxy() {};
+
+ <!-- inner classes -->
+ <xsl:for-each select="class">
<xsl:variable name="class" select="amqp:cpp-class-name(@name)"/>
<xsl:if test="doc">
<xsl:text>&#xA;/**&#xA;===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> =====&#xA;</xsl:text>
@@ -74,15 +74,32 @@ class AMQP_Server : virtual public AMQP_ClientOperations
<xsl:text> );&#xA;</xsl:text>
</xsl:if>
</xsl:for-each>
- <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */&#xA;</xsl:text>
+ <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/> */
</xsl:for-each>
- <xsl:text>}; /* class AMQP_Server */
+
+ <!-- Accessors for each nested class instance -->
+ <xsl:for-each select="class">
+ <xsl:value-of select="concat(amqp:cpp-class-name(@name), '&amp; get', amqp:cpp-class-name(@name), '()')"/>;
+ </xsl:for-each>
+
+ private:
+
+ OutputHandler* out;
+
+ <!-- An instance of each nested class -->
+ <xsl:for-each select="class">
+ <xsl:value-of select="concat(amqp:cpp-class-name(@name), ' ', amqp:cpp-name(@name))"/>;
+ </xsl:for-each>
+
+
+
+ }; /* class AMQP_ClientProxy */
} /* namespace framing */
} /* namespace qpid */
-#endif&#xA;</xsl:text>
- </xsl:result-document>
+#endif
+</xsl:result-document>
</xsl:template>
@@ -94,32 +111,39 @@ class AMQP_Server : virtual public AMQP_ClientOperations
-->
<xsl:template match="amqp" mode="server_cpp">
<xsl:param name="domain-cpp-table"/>
- <xsl:result-document href="AMQP_Server.cpp" format="textFormat">
+ <xsl:result-document href="AMQP_ClientProxy.cpp" format="textFormat">
<xsl:value-of select="amqp:copyright()"/>
- <xsl:text>
-#include "AMQP_Server.h"
+#include "AMQP_ClientProxy.h"
namespace qpid {
namespace framing {
-AMQP_Server::AMQP_Server(OutputHandler* _out) :
- out(_out)
+AMQP_ClientProxy::AMQP_ClientProxy(OutputHandler* _out) :
+ out(_out),
+ <!-- Initialisation of each nested class instance -->
+ <xsl:for-each select="class">
+ <xsl:value-of select="concat(amqp:cpp-name(@name), '(_out)')"/>
+ <xsl:if test="position()!=last()">,
+ </xsl:if>
+ </xsl:for-each>
+
{
-}&#xA;&#xA;</xsl:text>
+}
+
<xsl:for-each select="class">
<xsl:variable name="class" select="amqp:cpp-class-name(@name)"/>
<xsl:text>&#xA;/* ++++++++++ Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> ++++++++++ */
-AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) :
+AMQP_ClientProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) :
out(_out)
{
}
-AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}&#xA;&#xA;</xsl:text>
+AMQP_ClientProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}&#xA;&#xA;</xsl:text>
<xsl:for-each select="method">
<xsl:if test="chassis[@name='client']">
- <xsl:text>void AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text>
+ <xsl:text>void AMQP_ClientProxy::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text>
<xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field">
<xsl:text>,&#xA; </xsl:text>
<xsl:for-each select="field">
@@ -145,8 +169,16 @@ AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><
</xsl:if>
</xsl:for-each>
</xsl:for-each>
- <xsl:text>
+ <!-- Accessors for each nested class instance -->
+ <xsl:for-each select="class">
+ <xsl:value-of select="concat('AMQP_ClientProxy::', amqp:cpp-class-name(@name), '&amp; AMQP_ClientProxy::get', amqp:cpp-class-name(@name), '()')"/>{
+ <xsl:value-of select="concat(' return ', amqp:cpp-name(@name))"/>;
+ }
+
+ </xsl:for-each>
+
+ <xsl:text>
} /* namespace framing */
} /* namespace qpid */&#xA;</xsl:text>
</xsl:result-document>