diff options
author | Alan Conway <aconway@apache.org> | 2007-01-15 21:56:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-15 21:56:23 +0000 |
commit | ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08 (patch) | |
tree | 3b69ec6c589ff8edd628f2e218589180cbca005b | |
parent | 5aaad510dc978dc09f92c774c81255b7af6b8b68 (diff) | |
download | qpid-python-ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08.tar.gz |
* Client & broker using Requester/Responder to manage request/response IDs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496511 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/broker/Broker.h | 13 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 257 | ||||
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.h | 35 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 18 | ||||
-rw-r--r-- | cpp/lib/client/Connection.h | 5 | ||||
-rw-r--r-- | cpp/lib/client/Connector.cpp | 19 | ||||
-rw-r--r-- | cpp/lib/client/Connector.h | 109 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQFrame.h | 9 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQRequestBody.h | 6 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.cpp | 5 | ||||
-rw-r--r-- | cpp/lib/common/framing/AMQResponseBody.h | 7 | ||||
-rw-r--r-- | cpp/lib/common/framing/ProtocolVersionException.h | 11 | ||||
-rw-r--r-- | cpp/lib/common/framing/Requester.h | 2 | ||||
-rw-r--r-- | gentools/templ.cpp/MethodBodyClass.h.tmpl | 3 |
16 files changed, 321 insertions, 186 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 6a8b1f8538..c2117eaf23 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -23,6 +23,7 @@ #include "AMQFrame.h" #include "DirectExchange.h" +#include "TopicExchange.h" #include "FanOutExchange.h" #include "HeadersExchange.h" #include "MessageStoreModule.h" @@ -102,3 +103,4 @@ const int16_t Broker::DEFAULT_PORT(5672); }} // namespace qpid::broker + diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index f831b680e9..ad7fbb1eca 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -29,6 +29,15 @@ #include <SharedObject.h> #include <MessageStore.h> #include <AutoDelete.h> +#include "Requester.h" +#include "Responder.h" +#include <ExchangeRegistry.h> +#include <BrokerChannel.h> +#include <ConnectionToken.h> +#include <DirectExchange.h> +#include <OutputHandler.h> +#include <ProtocolInitiation.h> +#include <QueueRegistry.h> namespace qpid { namespace broker { @@ -77,6 +86,8 @@ class Broker : public qpid::sys::Runnable, u_int32_t getTimeout() { return timeout; } u_int64_t getStagingThreshold() { return stagingThreshold; } AutoDelete& getCleaner() { return cleaner; } + qpid::framing::Requester& getRequester() { return requester; } + qpid::framing::Responder& getResponder() { return responder; } private: Broker(const Configuration& config); @@ -89,6 +100,8 @@ class Broker : public qpid::sys::Runnable, u_int64_t stagingThreshold; AutoDelete cleaner; SessionHandlerFactoryImpl factory; + qpid::framing::Requester requester; + qpid::framing::Responder responder; }; }} diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 905ac83b92..d7f6320535 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -19,11 +19,15 @@ * */ #include <iostream> -#include <SessionHandlerImpl.h> -#include <FanOutExchange.h> -#include <HeadersExchange.h> -#include <TopicExchange.h> -#include "assert.h" +#include <assert.h> + +#include "SessionHandlerImpl.h" + +#include "FanOutExchange.h" +#include "HeadersExchange.h" + +#include "Requester.h" +#include "Responder.h" using namespace boost; using namespace qpid::sys; @@ -42,6 +46,8 @@ SessionHandlerImpl::SessionHandlerImpl( exchanges(broker.getExchanges()), cleaner(broker.getCleaner()), settings(broker.getTimeout(), broker.getStagingThreshold()), + requester(broker.getRequester()), + responder(broker.getResponder()), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), @@ -55,7 +61,7 @@ SessionHandlerImpl::SessionHandlerImpl( SessionHandlerImpl::~SessionHandlerImpl(){ - if (client != NULL) + if (client != NULL) delete client; } @@ -87,51 +93,87 @@ Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ return exchanges.get(name); } +void SessionHandlerImpl::handleMethod( + u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) +{ + AMQMethodBody::shared_ptr method = + shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + client->getChannel().close( + channel, e.code, e.text, + method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + client->getConnection().close( + 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + client->getConnection().close( + 0, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ u_int16_t channel = frame->getChannel(); AMQBody::shared_ptr body = frame->getBody(); - AMQMethodBody::shared_ptr method; - switch(body->type()) { case REQUEST_BODY: - // responder.received(frame); + responder.received(AMQRequestBody::getData(body)); + handleMethod(channel, body); + break; case RESPONSE_BODY: - // requester.received(frame); - case METHOD_BODY: // - method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); - try{ - method->invoke(*this, channel); - }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); - client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - string error(e.what()); - client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId()); - } - break; - - case HEADER_BODY: - this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + // Must process responses before marking them received. + handleMethod(channel, body); + requester.processed(AMQResponseBody::getData(body)); + break; + // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. + case METHOD_BODY: + handleMethod(channel, body); + break; + case HEADER_BODY: + handleHeader( + channel, shared_polymorphic_cast<AMQHeaderBody>(body)); break; - case CONTENT_BODY: - this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + case CONTENT_BODY: + handleContent( + channel, shared_polymorphic_cast<AMQContentBody>(body)); break; - case HEARTBEAT_BODY: - //channel must be 0 - this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + case HEARTBEAT_BODY: + assert(channel == 0); + handleHeartbeat( + shared_polymorphic_cast<AMQHeartbeatBody>(body)); break; } } +/** + * An OutputHandler that does request/response procssing before + * delgating to another OutputHandler. + */ +SessionHandlerImpl::Sender::Sender( + OutputHandler& oh, Requester& req, Responder& resp) + : out(oh), requester(req), responder(resp) +{} + +void SessionHandlerImpl::Sender::send(AMQFrame* frame) { + AMQBody::shared_ptr body = frame->getBody(); + u_int16_t type = body->type(); + if (type == REQUEST_BODY) + requester.sending(AMQRequestBody::getData(body)); + else if (type == RESPONSE_BODY) + responder.sending(AMQResponseBody::getData(body)); + out.send(frame); +} + void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ - if (client == NULL) + if (client == 0) { client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); @@ -280,7 +322,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind( const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } @@ -335,9 +377,9 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t Queue::shared_ptr queue = parent->getQueue(queueName, channel); Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); if(exchange){ -// kpvdr - cannot use this any longer as routingKey is now const -// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); -// exchange->bind(queue, routingKey, &arguments); + // kpvdr - cannot use this any longer as routingKey is now const + // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); + // exchange->bind(queue, routingKey, &arguments); string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); if(!nowait) parent->client->getQueue().bindOk(channel); @@ -483,25 +525,25 @@ SessionHandlerImpl::QueueHandlerImpl::unbind( const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void @@ -509,148 +551,149 @@ SessionHandlerImpl::ChannelHandlerImpl::resume( u_int16_t /*channel*/, const string& /*channelId*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } // Message class method handlers void SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) + const string& /*reference*/, + const string& /*bytes*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) + const string& /*destination*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) + const string& /*reference*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noLocal*/, + bool /*noAck*/, + bool /*exclusive*/, + const qpid::framing::FieldTable& /*filter*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noAck*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) + u_int64_t /*value*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) + const string& /*reference*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) + u_int32_t /*prefetchSize*/, + u_int16_t /*prefetchCount*/, + bool /*global*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) + bool /*requeue*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) + u_int16_t /*code*/, + const string& /*text*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool /*immediate*/, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } }} + diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 08b05a11b6..070bd1266e 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -24,28 +24,20 @@ #include <map> #include <sstream> #include <vector> -#include <exception> + #include <AMQFrame.h> #include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> -#include <AutoDelete.h> -#include <ExchangeRegistry.h> -#include <BrokerChannel.h> -#include <ConnectionToken.h> -#include <DirectExchange.h> -#include <OutputHandler.h> -#include <ProtocolInitiation.h> -#include <QueueRegistry.h> #include <sys/SessionContext.h> #include <sys/SessionHandler.h> #include <sys/TimeoutHandler.h> -#include <TopicExchange.h> #include "Broker.h" +#include "Exception.h" namespace qpid { namespace broker { -struct ChannelException : public std::exception { +struct ChannelException : public qpid::Exception { u_int16_t code; string text; ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} @@ -53,7 +45,7 @@ struct ChannelException : public std::exception { const char* what() const throw() { return text.c_str(); } }; -struct ConnectionException : public std::exception { +struct ConnectionException : public qpid::Exception { u_int16_t code; string text; ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} @@ -75,13 +67,25 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, { typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - + class Sender : public qpid::framing::OutputHandler { + public: + Sender(qpid::framing::OutputHandler&, + qpid::framing::Requester&, qpid::framing::Responder&); + void send(qpid::framing::AMQFrame* frame); + private: + OutputHandler& out; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; + }; + qpid::sys::SessionContext* context; qpid::framing::AMQP_ClientProxy* client; QueueRegistry& queues; ExchangeRegistry& exchanges; AutoDelete& cleaner; Settings settings; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; std::auto_ptr<BasicHandler> basicHandler; std::auto_ptr<ChannelHandler> channelHandler; std::auto_ptr<ConnectionHandler> connectionHandler; @@ -98,6 +102,7 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); + void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body); void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); Channel* getChannel(u_int16_t channel); @@ -371,8 +376,6 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } }; -} -} - +}} #endif diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index ad8aa1d0dd..10a0b50aad 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -32,10 +32,14 @@ using namespace qpid::sys; u_int16_t Connection::channelIdCounter; -Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true), +Connection::Connection( + bool debug, u_int32_t _max_frame_size, + qpid::framing::ProtocolVersion* _version +) : max_frame_size(_max_frame_size), closed(true), version(_version->getMajor(),_version->getMinor()) { - connector = new Connector(version, debug, _max_frame_size); + connector = new Connector( + version, requester, responder, debug, _max_frame_size); } Connection::~Connection(){ @@ -152,6 +156,16 @@ void Connection::removeChannel(Channel* channel){ } void Connection::received(AMQFrame* frame){ + AMQBody::shared_ptr body = frame->getBody(); + u_int8_t type = body->type(); + if (type == REQUEST_BODY) + responder.received(AMQRequestBody::getData(body)); + handleFrame(frame); + if (type == RESPONSE_BODY) + requester.processed(AMQResponseBody::getData(body)); +} + +void Connection::handleFrame(AMQFrame* frame){ u_int16_t channelId = frame->getChannel(); if(channelId == 0){ diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 37e65e6099..21e2fb90a2 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -37,6 +37,8 @@ #include <ClientQueue.h> #include <ResponseHandler.h> #include <AMQP_HighestVersion.h> +#include "Requester.h" +#include "Responder.h" namespace qpid { @@ -79,6 +81,8 @@ namespace client { ResponseHandler responses; volatile bool closed; qpid::framing::ProtocolVersion version; + qpid::framing::Requester requester; + qpid::framing::Responder responder; void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); void error(int code, const std::string& msg, int classid = 0, int methodid = 0); @@ -89,6 +93,7 @@ namespace client { virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body); virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body); virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + void handleFrame(qpid::framing::AMQFrame* frame); public: /** diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index b34e66fd94..d05540ba32 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -22,13 +22,17 @@ #include <QpidError.h> #include <sys/Time.h> #include "Connector.h" +#include "Requester.h" +#include "Responder.h" using namespace qpid::sys; using namespace qpid::client; using namespace qpid::framing; using qpid::QpidError; -Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) : +Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, + Requester& req, Responder& resp, + bool _debug, u_int32_t buffer_size) : debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), @@ -40,7 +44,10 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug timeoutHandler(0), shutdownHandler(0), inbuf(receive_buffer_size), - outbuf(send_buffer_size){ } + outbuf(send_buffer_size), + requester(req), + responder(resp) +{ } Connector::~Connector(){ } @@ -75,7 +82,13 @@ OutputHandler* Connector::getOutputHandler(){ } void Connector::send(AMQFrame* frame){ - writeBlock(frame); + AMQBody::shared_ptr body = frame->getBody(); + u_int8_t type = body->type(); + if (type == REQUEST_BODY) + requester.sending(AMQRequestBody::getData(body)); + else if (type == RESPONSE_BODY) + responder.sending(AMQResponseBody::getData(body)); + writeBlock(frame); if(debug) std::cout << "SENT: " << *frame << std::endl; delete frame; } diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index f9e50f3216..02926b2bdb 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -34,60 +34,73 @@ #include <sys/Socket.h> namespace qpid { + +namespace framing { + +class Requester; +class Responder; + +} // namespace framing + namespace client { - class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - qpid::framing::ProtocolVersion version; - - bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - qpid::sys::TimeoutHandler* timeoutHandler; - qpid::sys::ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; +class Connector : public qpid::framing::OutputHandler, + private qpid::sys::Runnable +{ + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + qpid::framing::ProtocolVersion version; + + bool closed; + + int64_t lastIn; + int64_t lastOut; + int64_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + qpid::sys::TimeoutHandler* timeoutHandler; + qpid::sys::ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::sys::Mutex writeLock; + qpid::sys::Thread receiver; - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; + qpid::sys::Socket socket; - qpid::sys::Socket socket; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; - void checkIdle(ssize_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - void handleClosed(); - - public: - Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); - virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; + void checkIdle(ssize_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + void handleClosed(); + + public: + Connector(const qpid::framing::ProtocolVersion& pVersion, + qpid::framing::Requester& req, qpid::framing::Responder& resp, + bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); + virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); +}; } } diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index a927481e77..c27de70e5a 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -21,7 +21,8 @@ * under the License. * */ -/*#include <qpid/framing/amqp_methods.h>*/ +#include <boost/cast.hpp> + #include <amqp_types.h> #include <AMQBody.h> #include <AMQDataBlock.h> @@ -50,6 +51,12 @@ class AMQFrame : virtual public AMQDataBlock u_int16_t getChannel(); AMQBody::shared_ptr getBody(); + /** Convenience template to cast the body to an expected type */ + template <class T> boost::shared_ptr<T> castBody() { + assert(dynamic_cast<T*>(getBody().get())); + boost::static_pointer_cast<T>(getBody()); + } + u_int32_t decodeHead(Buffer& buffer); void decodeBody(Buffer& buffer, uint32_t size); diff --git a/cpp/lib/common/framing/AMQRequestBody.cpp b/cpp/lib/common/framing/AMQRequestBody.cpp index a5e6ce6974..b18671c711 100644 --- a/cpp/lib/common/framing/AMQRequestBody.cpp +++ b/cpp/lib/common/framing/AMQRequestBody.cpp @@ -55,4 +55,10 @@ AMQRequestBody::create( return AMQRequestBody::shared_ptr(body); } +void AMQRequestBody::printPrefix(std::ostream& out) const { + out << "request(id=" << data.requestId << ",mark=" + << data.responseMark << "): "; +} + }} // namespace qpid::framing + diff --git a/cpp/lib/common/framing/AMQRequestBody.h b/cpp/lib/common/framing/AMQRequestBody.h index 74aa398606..1a1d3db0e7 100644 --- a/cpp/lib/common/framing/AMQRequestBody.h +++ b/cpp/lib/common/framing/AMQRequestBody.h @@ -42,6 +42,10 @@ class AMQRequestBody : public AMQMethodBody ResponseId responseMark; }; + static Data& getData(const AMQBody::shared_ptr& body) { + return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData(); + } + static shared_ptr create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer); @@ -52,6 +56,7 @@ class AMQRequestBody : public AMQMethodBody u_int8_t type() const { return REQUEST_BODY; } void encode(Buffer& buffer) const; + Data& getData() { return data; } RequestId getRequestId() const { return data.requestId; } void setRequestId(RequestId id) { data.requestId=id; } ResponseId getResponseMark() const { return data.responseMark; } @@ -59,6 +64,7 @@ class AMQRequestBody : public AMQMethodBody protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; } + void printPrefix(std::ostream& out) const; private: Data data; diff --git a/cpp/lib/common/framing/AMQResponseBody.cpp b/cpp/lib/common/framing/AMQResponseBody.cpp index 49fdea9242..c64b1325d6 100644 --- a/cpp/lib/common/framing/AMQResponseBody.cpp +++ b/cpp/lib/common/framing/AMQResponseBody.cpp @@ -56,5 +56,10 @@ AMQResponseBody::shared_ptr AMQResponseBody::create( return AMQResponseBody::shared_ptr(body); } +void AMQResponseBody::printPrefix(std::ostream& out) const { + out << "response(id=" << data.responseId << ",request=" << data.requestId + << ",batch=" << data.batchOffset << "): "; +} + }} // namespace qpid::framing diff --git a/cpp/lib/common/framing/AMQResponseBody.h b/cpp/lib/common/framing/AMQResponseBody.h index d7095d3da0..6528613a12 100644 --- a/cpp/lib/common/framing/AMQResponseBody.h +++ b/cpp/lib/common/framing/AMQResponseBody.h @@ -46,6 +46,10 @@ class AMQResponseBody : public AMQMethodBody u_int32_t batchOffset; }; + static Data& getData(const AMQBody::shared_ptr& body) { + return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData(); + } + static shared_ptr create( AMQP_MethodVersionMap& versionMap, ProtocolVersion version, Buffer& buffer); @@ -57,12 +61,15 @@ class AMQResponseBody : public AMQMethodBody u_int8_t type() const { return RESPONSE_BODY; } void encode(Buffer& buffer) const; + Data& getData() { return data; } ResponseId getResponseId() { return data.responseId; } RequestId getRequestId() { return data.requestId; } BatchOffset getBatchOffset() { return data.batchOffset; } protected: static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; } + void printPrefix(std::ostream& out) const; + private: Data data; }; diff --git a/cpp/lib/common/framing/ProtocolVersionException.h b/cpp/lib/common/framing/ProtocolVersionException.h index 4494d87064..32b5bc7ef4 100644 --- a/cpp/lib/common/framing/ProtocolVersionException.h +++ b/cpp/lib/common/framing/ProtocolVersionException.h @@ -27,12 +27,10 @@ #include <string> #include <vector> -namespace qpid -{ -namespace framing -{ +namespace qpid { +namespace framing { -class ProtocolVersionException : virtual public qpid::Exception +class ProtocolVersionException : public qpid::Exception { protected: ProtocolVersion versionFound; @@ -49,7 +47,6 @@ public: virtual std::string toString() const throw(); }; // class ProtocolVersionException -} // namespace framing -} // namespace qpid +}} // namespace qpid::framing #endif //ifndef _ProtocolVersionException_ diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h index e24848f98a..562ba681c1 100644 --- a/cpp/lib/common/framing/Requester.h +++ b/cpp/lib/common/framing/Requester.h @@ -45,7 +45,7 @@ class Requester /** Called after processing a response. */ void processed(const AMQResponseBody::Data&); - + private: std::set<RequestId> requests; /** Sent but not responded to */ RequestId lastId; diff --git a/gentools/templ.cpp/MethodBodyClass.h.tmpl b/gentools/templ.cpp/MethodBodyClass.h.tmpl index 428f0f54b6..39f423dafb 100644 --- a/gentools/templ.cpp/MethodBodyClass.h.tmpl +++ b/gentools/templ.cpp/MethodBodyClass.h.tmpl @@ -69,7 +69,8 @@ ${mb_constructor_with_initializers} inline void print(std::ostream& out) const { - out << "${CLASS}${METHOD}: "; + printPrefix(out); + out << "${CLASS}${METHOD}: "; %{FLIST} ${mb_field_print} } |