summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-15 21:56:23 +0000
committerAlan Conway <aconway@apache.org>2007-01-15 21:56:23 +0000
commitef1469a7ea1f54f266aee8f2899b7cd0c7e07d08 (patch)
tree3b69ec6c589ff8edd628f2e218589180cbca005b
parent5aaad510dc978dc09f92c774c81255b7af6b8b68 (diff)
downloadqpid-python-ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08.tar.gz
* Client & broker using Requester/Responder to manage request/response IDs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496511 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/Broker.cpp2
-rw-r--r--cpp/lib/broker/Broker.h13
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp257
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.h35
-rw-r--r--cpp/lib/client/Connection.cpp18
-rw-r--r--cpp/lib/client/Connection.h5
-rw-r--r--cpp/lib/client/Connector.cpp19
-rw-r--r--cpp/lib/client/Connector.h109
-rw-r--r--cpp/lib/common/framing/AMQFrame.h9
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.cpp6
-rw-r--r--cpp/lib/common/framing/AMQRequestBody.h6
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.cpp5
-rw-r--r--cpp/lib/common/framing/AMQResponseBody.h7
-rw-r--r--cpp/lib/common/framing/ProtocolVersionException.h11
-rw-r--r--cpp/lib/common/framing/Requester.h2
-rw-r--r--gentools/templ.cpp/MethodBodyClass.h.tmpl3
16 files changed, 321 insertions, 186 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index 6a8b1f8538..c2117eaf23 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -23,6 +23,7 @@
#include "AMQFrame.h"
#include "DirectExchange.h"
+#include "TopicExchange.h"
#include "FanOutExchange.h"
#include "HeadersExchange.h"
#include "MessageStoreModule.h"
@@ -102,3 +103,4 @@ const int16_t Broker::DEFAULT_PORT(5672);
}} // namespace qpid::broker
+
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h
index f831b680e9..ad7fbb1eca 100644
--- a/cpp/lib/broker/Broker.h
+++ b/cpp/lib/broker/Broker.h
@@ -29,6 +29,15 @@
#include <SharedObject.h>
#include <MessageStore.h>
#include <AutoDelete.h>
+#include "Requester.h"
+#include "Responder.h"
+#include <ExchangeRegistry.h>
+#include <BrokerChannel.h>
+#include <ConnectionToken.h>
+#include <DirectExchange.h>
+#include <OutputHandler.h>
+#include <ProtocolInitiation.h>
+#include <QueueRegistry.h>
namespace qpid {
namespace broker {
@@ -77,6 +86,8 @@ class Broker : public qpid::sys::Runnable,
u_int32_t getTimeout() { return timeout; }
u_int64_t getStagingThreshold() { return stagingThreshold; }
AutoDelete& getCleaner() { return cleaner; }
+ qpid::framing::Requester& getRequester() { return requester; }
+ qpid::framing::Responder& getResponder() { return responder; }
private:
Broker(const Configuration& config);
@@ -89,6 +100,8 @@ class Broker : public qpid::sys::Runnable,
u_int64_t stagingThreshold;
AutoDelete cleaner;
SessionHandlerFactoryImpl factory;
+ qpid::framing::Requester requester;
+ qpid::framing::Responder responder;
};
}}
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index 905ac83b92..d7f6320535 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -19,11 +19,15 @@
*
*/
#include <iostream>
-#include <SessionHandlerImpl.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <TopicExchange.h>
-#include "assert.h"
+#include <assert.h>
+
+#include "SessionHandlerImpl.h"
+
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+
+#include "Requester.h"
+#include "Responder.h"
using namespace boost;
using namespace qpid::sys;
@@ -42,6 +46,8 @@ SessionHandlerImpl::SessionHandlerImpl(
exchanges(broker.getExchanges()),
cleaner(broker.getCleaner()),
settings(broker.getTimeout(), broker.getStagingThreshold()),
+ requester(broker.getRequester()),
+ responder(broker.getResponder()),
basicHandler(new BasicHandlerImpl(this)),
channelHandler(new ChannelHandlerImpl(this)),
connectionHandler(new ConnectionHandlerImpl(this)),
@@ -55,7 +61,7 @@ SessionHandlerImpl::SessionHandlerImpl(
SessionHandlerImpl::~SessionHandlerImpl(){
- if (client != NULL)
+ if (client != NULL)
delete client;
}
@@ -87,51 +93,87 @@ Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
return exchanges.get(name);
}
+void SessionHandlerImpl::handleMethod(
+ u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
+{
+ AMQMethodBody::shared_ptr method =
+ shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ client->getChannel().close(
+ channel, e.code, e.text,
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ client->getConnection().close(
+ 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ client->getConnection().close(
+ 0, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
u_int16_t channel = frame->getChannel();
AMQBody::shared_ptr body = frame->getBody();
- AMQMethodBody::shared_ptr method;
-
switch(body->type())
{
case REQUEST_BODY:
- // responder.received(frame);
+ responder.received(AMQRequestBody::getData(body));
+ handleMethod(channel, body);
+ break;
case RESPONSE_BODY:
- // requester.received(frame);
- case METHOD_BODY: //
- method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
- try{
- method->invoke(*this, channel);
- }catch(ChannelException& e){
- channels[channel]->close();
- channels.erase(channel);
- client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }catch(ConnectionException& e){
- client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- string error(e.what());
- client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
- }
- break;
-
- case HEADER_BODY:
- this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ // Must process responses before marking them received.
+ handleMethod(channel, body);
+ requester.processed(AMQResponseBody::getData(body));
+ break;
+ // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
+ case METHOD_BODY:
+ handleMethod(channel, body);
+ break;
+ case HEADER_BODY:
+ handleHeader(
+ channel, shared_polymorphic_cast<AMQHeaderBody>(body));
break;
- case CONTENT_BODY:
- this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ case CONTENT_BODY:
+ handleContent(
+ channel, shared_polymorphic_cast<AMQContentBody>(body));
break;
- case HEARTBEAT_BODY:
- //channel must be 0
- this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ case HEARTBEAT_BODY:
+ assert(channel == 0);
+ handleHeartbeat(
+ shared_polymorphic_cast<AMQHeartbeatBody>(body));
break;
}
}
+/**
+ * An OutputHandler that does request/response procssing before
+ * delgating to another OutputHandler.
+ */
+SessionHandlerImpl::Sender::Sender(
+ OutputHandler& oh, Requester& req, Responder& resp)
+ : out(oh), requester(req), responder(resp)
+{}
+
+void SessionHandlerImpl::Sender::send(AMQFrame* frame) {
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int16_t type = body->type();
+ if (type == REQUEST_BODY)
+ requester.sending(AMQRequestBody::getData(body));
+ else if (type == RESPONSE_BODY)
+ responder.sending(AMQResponseBody::getData(body));
+ out.send(frame);
+}
+
void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
- if (client == NULL)
+ if (client == 0)
{
client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
@@ -280,7 +322,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind(
const string& /*routingKey*/,
const qpid::framing::FieldTable& /*arguments*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
@@ -335,9 +377,9 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
if(exchange){
-// kpvdr - cannot use this any longer as routingKey is now const
-// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
-// exchange->bind(queue, routingKey, &arguments);
+ // kpvdr - cannot use this any longer as routingKey is now const
+ // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+ // exchange->bind(queue, routingKey, &arguments);
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
if(!nowait) parent->client->getQueue().bindOk(channel);
@@ -483,25 +525,25 @@ SessionHandlerImpl::QueueHandlerImpl::unbind(
const string& /*routingKey*/,
const qpid::framing::FieldTable& /*arguments*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
@@ -509,148 +551,149 @@ SessionHandlerImpl::ChannelHandlerImpl::resume(
u_int16_t /*channel*/,
const string& /*channelId*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
// Message class method handlers
void
SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
+ const string& /*reference*/,
+ const string& /*bytes*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
- const string& /*destination*/ )
+ const string& /*destination*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noLocal*/,
- bool /*noAck*/,
- bool /*exclusive*/,
- const qpid::framing::FieldTable& /*filter*/ )
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noLocal*/,
+ bool /*noAck*/,
+ bool /*exclusive*/,
+ const qpid::framing::FieldTable& /*filter*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noAck*/ )
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
+ u_int64_t /*value*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/,
- u_int32_t /*prefetchSize*/,
- u_int16_t /*prefetchCount*/,
- bool /*global*/ )
+ u_int32_t /*prefetchSize*/,
+ u_int16_t /*prefetchCount*/,
+ bool /*global*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/,
- bool /*requeue*/ )
+ bool /*requeue*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
+ u_int16_t /*code*/,
+ const string& /*text*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool /*immediate*/,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool /*immediate*/,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content /*body*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
}}
+
diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h
index 08b05a11b6..070bd1266e 100644
--- a/cpp/lib/broker/SessionHandlerImpl.h
+++ b/cpp/lib/broker/SessionHandlerImpl.h
@@ -24,28 +24,20 @@
#include <map>
#include <sstream>
#include <vector>
-#include <exception>
+
#include <AMQFrame.h>
#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
-#include <AutoDelete.h>
-#include <ExchangeRegistry.h>
-#include <BrokerChannel.h>
-#include <ConnectionToken.h>
-#include <DirectExchange.h>
-#include <OutputHandler.h>
-#include <ProtocolInitiation.h>
-#include <QueueRegistry.h>
#include <sys/SessionContext.h>
#include <sys/SessionHandler.h>
#include <sys/TimeoutHandler.h>
-#include <TopicExchange.h>
#include "Broker.h"
+#include "Exception.h"
namespace qpid {
namespace broker {
-struct ChannelException : public std::exception {
+struct ChannelException : public qpid::Exception {
u_int16_t code;
string text;
ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
@@ -53,7 +45,7 @@ struct ChannelException : public std::exception {
const char* what() const throw() { return text.c_str(); }
};
-struct ConnectionException : public std::exception {
+struct ConnectionException : public qpid::Exception {
u_int16_t code;
string text;
ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
@@ -75,13 +67,25 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
{
typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
+ class Sender : public qpid::framing::OutputHandler {
+ public:
+ Sender(qpid::framing::OutputHandler&,
+ qpid::framing::Requester&, qpid::framing::Responder&);
+ void send(qpid::framing::AMQFrame* frame);
+ private:
+ OutputHandler& out;
+ qpid::framing::Requester& requester;
+ qpid::framing::Responder& responder;
+ };
+
qpid::sys::SessionContext* context;
qpid::framing::AMQP_ClientProxy* client;
QueueRegistry& queues;
ExchangeRegistry& exchanges;
AutoDelete& cleaner;
Settings settings;
+ qpid::framing::Requester& requester;
+ qpid::framing::Responder& responder;
std::auto_ptr<BasicHandler> basicHandler;
std::auto_ptr<ChannelHandler> channelHandler;
std::auto_ptr<ConnectionHandler> connectionHandler;
@@ -98,6 +102,7 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+ void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body);
void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
Channel* getChannel(u_int16_t channel);
@@ -371,8 +376,6 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler,
virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }
};
-}
-}
-
+}}
#endif
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index ad8aa1d0dd..10a0b50aad 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -32,10 +32,14 @@ using namespace qpid::sys;
u_int16_t Connection::channelIdCounter;
-Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true),
+Connection::Connection(
+ bool debug, u_int32_t _max_frame_size,
+ qpid::framing::ProtocolVersion* _version
+) : max_frame_size(_max_frame_size), closed(true),
version(_version->getMajor(),_version->getMinor())
{
- connector = new Connector(version, debug, _max_frame_size);
+ connector = new Connector(
+ version, requester, responder, debug, _max_frame_size);
}
Connection::~Connection(){
@@ -152,6 +156,16 @@ void Connection::removeChannel(Channel* channel){
}
void Connection::received(AMQFrame* frame){
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int8_t type = body->type();
+ if (type == REQUEST_BODY)
+ responder.received(AMQRequestBody::getData(body));
+ handleFrame(frame);
+ if (type == RESPONSE_BODY)
+ requester.processed(AMQResponseBody::getData(body));
+}
+
+void Connection::handleFrame(AMQFrame* frame){
u_int16_t channelId = frame->getChannel();
if(channelId == 0){
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 37e65e6099..21e2fb90a2 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -37,6 +37,8 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <AMQP_HighestVersion.h>
+#include "Requester.h"
+#include "Responder.h"
namespace qpid {
@@ -79,6 +81,8 @@ namespace client {
ResponseHandler responses;
volatile bool closed;
qpid::framing::ProtocolVersion version;
+ qpid::framing::Requester requester;
+ qpid::framing::Responder responder;
void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
@@ -89,6 +93,7 @@ namespace client {
virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+ void handleFrame(qpid::framing::AMQFrame* frame);
public:
/**
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index b34e66fd94..d05540ba32 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -22,13 +22,17 @@
#include <QpidError.h>
#include <sys/Time.h>
#include "Connector.h"
+#include "Requester.h"
+#include "Responder.h"
using namespace qpid::sys;
using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
-Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
+ Requester& req, Responder& resp,
+ bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
@@ -40,7 +44,10 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug
timeoutHandler(0),
shutdownHandler(0),
inbuf(receive_buffer_size),
- outbuf(send_buffer_size){ }
+ outbuf(send_buffer_size),
+ requester(req),
+ responder(resp)
+{ }
Connector::~Connector(){ }
@@ -75,7 +82,13 @@ OutputHandler* Connector::getOutputHandler(){
}
void Connector::send(AMQFrame* frame){
- writeBlock(frame);
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int8_t type = body->type();
+ if (type == REQUEST_BODY)
+ requester.sending(AMQRequestBody::getData(body));
+ else if (type == RESPONSE_BODY)
+ responder.sending(AMQResponseBody::getData(body));
+ writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
}
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index f9e50f3216..02926b2bdb 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -34,60 +34,73 @@
#include <sys/Socket.h>
namespace qpid {
+
+namespace framing {
+
+class Requester;
+class Responder;
+
+} // namespace framing
+
namespace client {
- class Connector : public qpid::framing::OutputHandler,
- private qpid::sys::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
- qpid::framing::ProtocolVersion version;
-
- bool closed;
-
- int64_t lastIn;
- int64_t lastOut;
- int64_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- qpid::sys::TimeoutHandler* timeoutHandler;
- qpid::sys::ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
+class Connector : public qpid::framing::OutputHandler,
+ private qpid::sys::Runnable
+{
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+ qpid::framing::ProtocolVersion version;
+
+ bool closed;
+
+ int64_t lastIn;
+ int64_t lastOut;
+ int64_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ qpid::sys::TimeoutHandler* timeoutHandler;
+ qpid::sys::ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::sys::Mutex writeLock;
+ qpid::sys::Thread receiver;
- qpid::sys::Mutex writeLock;
- qpid::sys::Thread receiver;
+ qpid::sys::Socket socket;
- qpid::sys::Socket socket;
+ qpid::framing::Requester& requester;
+ qpid::framing::Responder& responder;
- void checkIdle(ssize_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
- void handleClosed();
-
- public:
- Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
- virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
+ void checkIdle(ssize_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+ void handleClosed();
+
+ public:
+ Connector(const qpid::framing::ProtocolVersion& pVersion,
+ qpid::framing::Requester& req, qpid::framing::Responder& resp,
+ bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+};
}
}
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index a927481e77..c27de70e5a 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -21,7 +21,8 @@
* under the License.
*
*/
-/*#include <qpid/framing/amqp_methods.h>*/
+#include <boost/cast.hpp>
+
#include <amqp_types.h>
#include <AMQBody.h>
#include <AMQDataBlock.h>
@@ -50,6 +51,12 @@ class AMQFrame : virtual public AMQDataBlock
u_int16_t getChannel();
AMQBody::shared_ptr getBody();
+ /** Convenience template to cast the body to an expected type */
+ template <class T> boost::shared_ptr<T> castBody() {
+ assert(dynamic_cast<T*>(getBody().get()));
+ boost::static_pointer_cast<T>(getBody());
+ }
+
u_int32_t decodeHead(Buffer& buffer);
void decodeBody(Buffer& buffer, uint32_t size);
diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp
index a5e6ce6974..b18671c711 100644
--- a/cpp/lib/common/framing/AMQRequestBody.cpp
+++ b/cpp/lib/common/framing/AMQRequestBody.cpp
@@ -55,4 +55,10 @@ AMQRequestBody::create(
return AMQRequestBody::shared_ptr(body);
}
+void AMQRequestBody::printPrefix(std::ostream& out) const {
+ out << "request(id=" << data.requestId << ",mark="
+ << data.responseMark << "): ";
+}
+
}} // namespace qpid::framing
+
diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h
index 74aa398606..1a1d3db0e7 100644
--- a/cpp/lib/common/framing/AMQRequestBody.h
+++ b/cpp/lib/common/framing/AMQRequestBody.h
@@ -42,6 +42,10 @@ class AMQRequestBody : public AMQMethodBody
ResponseId responseMark;
};
+ static Data& getData(const AMQBody::shared_ptr& body) {
+ return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData();
+ }
+
static shared_ptr create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer);
@@ -52,6 +56,7 @@ class AMQRequestBody : public AMQMethodBody
u_int8_t type() const { return REQUEST_BODY; }
void encode(Buffer& buffer) const;
+ Data& getData() { return data; }
RequestId getRequestId() const { return data.requestId; }
void setRequestId(RequestId id) { data.requestId=id; }
ResponseId getResponseMark() const { return data.responseMark; }
@@ -59,6 +64,7 @@ class AMQRequestBody : public AMQMethodBody
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; }
+ void printPrefix(std::ostream& out) const;
private:
Data data;
diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp
index 49fdea9242..c64b1325d6 100644
--- a/cpp/lib/common/framing/AMQResponseBody.cpp
+++ b/cpp/lib/common/framing/AMQResponseBody.cpp
@@ -56,5 +56,10 @@ AMQResponseBody::shared_ptr AMQResponseBody::create(
return AMQResponseBody::shared_ptr(body);
}
+void AMQResponseBody::printPrefix(std::ostream& out) const {
+ out << "response(id=" << data.responseId << ",request=" << data.requestId
+ << ",batch=" << data.batchOffset << "): ";
+}
+
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h
index d7095d3da0..6528613a12 100644
--- a/cpp/lib/common/framing/AMQResponseBody.h
+++ b/cpp/lib/common/framing/AMQResponseBody.h
@@ -46,6 +46,10 @@ class AMQResponseBody : public AMQMethodBody
u_int32_t batchOffset;
};
+ static Data& getData(const AMQBody::shared_ptr& body) {
+ return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData();
+ }
+
static shared_ptr create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer);
@@ -57,12 +61,15 @@ class AMQResponseBody : public AMQMethodBody
u_int8_t type() const { return RESPONSE_BODY; }
void encode(Buffer& buffer) const;
+ Data& getData() { return data; }
ResponseId getResponseId() { return data.responseId; }
RequestId getRequestId() { return data.requestId; }
BatchOffset getBatchOffset() { return data.batchOffset; }
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
+ void printPrefix(std::ostream& out) const;
+
private:
Data data;
};
diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h
index 4494d87064..32b5bc7ef4 100644
--- a/cpp/lib/common/framing/ProtocolVersionException.h
+++ b/cpp/lib/common/framing/ProtocolVersionException.h
@@ -27,12 +27,10 @@
#include <string>
#include <vector>
-namespace qpid
-{
-namespace framing
-{
+namespace qpid {
+namespace framing {
-class ProtocolVersionException : virtual public qpid::Exception
+class ProtocolVersionException : public qpid::Exception
{
protected:
ProtocolVersion versionFound;
@@ -49,7 +47,6 @@ public:
virtual std::string toString() const throw();
}; // class ProtocolVersionException
-} // namespace framing
-} // namespace qpid
+}} // namespace qpid::framing
#endif //ifndef _ProtocolVersionException_
diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h
index e24848f98a..562ba681c1 100644
--- a/cpp/lib/common/framing/Requester.h
+++ b/cpp/lib/common/framing/Requester.h
@@ -45,7 +45,7 @@ class Requester
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
-
+
private:
std::set<RequestId> requests; /** Sent but not responded to */
RequestId lastId;
diff --git a/gentools/templ.cpp/MethodBodyClass.h.tmpl b/gentools/templ.cpp/MethodBodyClass.h.tmpl
index 428f0f54b6..39f423dafb 100644
--- a/gentools/templ.cpp/MethodBodyClass.h.tmpl
+++ b/gentools/templ.cpp/MethodBodyClass.h.tmpl
@@ -69,7 +69,8 @@ ${mb_constructor_with_initializers}
inline void print(std::ostream& out) const
{
- out << "${CLASS}${METHOD}: ";
+ printPrefix(out);
+ out << "${CLASS}${METHOD}: ";
%{FLIST} ${mb_field_print}
}