diff options
Diffstat (limited to 'cpp/lib/broker/SessionHandlerImpl.cpp')
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 257 |
1 files changed, 150 insertions, 107 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 905ac83b92..d7f6320535 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -19,11 +19,15 @@ * */ #include <iostream> -#include <SessionHandlerImpl.h> -#include <FanOutExchange.h> -#include <HeadersExchange.h> -#include <TopicExchange.h> -#include "assert.h" +#include <assert.h> + +#include "SessionHandlerImpl.h" + +#include "FanOutExchange.h" +#include "HeadersExchange.h" + +#include "Requester.h" +#include "Responder.h" using namespace boost; using namespace qpid::sys; @@ -42,6 +46,8 @@ SessionHandlerImpl::SessionHandlerImpl( exchanges(broker.getExchanges()), cleaner(broker.getCleaner()), settings(broker.getTimeout(), broker.getStagingThreshold()), + requester(broker.getRequester()), + responder(broker.getResponder()), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), @@ -55,7 +61,7 @@ SessionHandlerImpl::SessionHandlerImpl( SessionHandlerImpl::~SessionHandlerImpl(){ - if (client != NULL) + if (client != NULL) delete client; } @@ -87,51 +93,87 @@ Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ return exchanges.get(name); } +void SessionHandlerImpl::handleMethod( + u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) +{ + AMQMethodBody::shared_ptr method = + shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + client->getChannel().close( + channel, e.code, e.text, + method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + client->getConnection().close( + 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + client->getConnection().close( + 0, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ u_int16_t channel = frame->getChannel(); AMQBody::shared_ptr body = frame->getBody(); - AMQMethodBody::shared_ptr method; - switch(body->type()) { case REQUEST_BODY: - // responder.received(frame); + responder.received(AMQRequestBody::getData(body)); + handleMethod(channel, body); + break; case RESPONSE_BODY: - // requester.received(frame); - case METHOD_BODY: // - method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); - try{ - method->invoke(*this, channel); - }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); - client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - string error(e.what()); - client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId()); - } - break; - - case HEADER_BODY: - this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + // Must process responses before marking them received. + handleMethod(channel, body); + requester.processed(AMQResponseBody::getData(body)); + break; + // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. + case METHOD_BODY: + handleMethod(channel, body); + break; + case HEADER_BODY: + handleHeader( + channel, shared_polymorphic_cast<AMQHeaderBody>(body)); break; - case CONTENT_BODY: - this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + case CONTENT_BODY: + handleContent( + channel, shared_polymorphic_cast<AMQContentBody>(body)); break; - case HEARTBEAT_BODY: - //channel must be 0 - this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + case HEARTBEAT_BODY: + assert(channel == 0); + handleHeartbeat( + shared_polymorphic_cast<AMQHeartbeatBody>(body)); break; } } +/** + * An OutputHandler that does request/response procssing before + * delgating to another OutputHandler. + */ +SessionHandlerImpl::Sender::Sender( + OutputHandler& oh, Requester& req, Responder& resp) + : out(oh), requester(req), responder(resp) +{} + +void SessionHandlerImpl::Sender::send(AMQFrame* frame) { + AMQBody::shared_ptr body = frame->getBody(); + u_int16_t type = body->type(); + if (type == REQUEST_BODY) + requester.sending(AMQRequestBody::getData(body)); + else if (type == RESPONSE_BODY) + responder.sending(AMQResponseBody::getData(body)); + out.send(frame); +} + void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ - if (client == NULL) + if (client == 0) { client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); @@ -280,7 +322,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind( const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } @@ -335,9 +377,9 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t Queue::shared_ptr queue = parent->getQueue(queueName, channel); Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); if(exchange){ -// kpvdr - cannot use this any longer as routingKey is now const -// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); -// exchange->bind(queue, routingKey, &arguments); + // kpvdr - cannot use this any longer as routingKey is now const + // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); + // exchange->bind(queue, routingKey, &arguments); string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); if(!nowait) parent->client->getQueue().bindOk(channel); @@ -483,25 +525,25 @@ SessionHandlerImpl::QueueHandlerImpl::unbind( const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void @@ -509,148 +551,149 @@ SessionHandlerImpl::ChannelHandlerImpl::resume( u_int16_t /*channel*/, const string& /*channelId*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } // Message class method handlers void SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) + const string& /*reference*/, + const string& /*bytes*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) + const string& /*destination*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) + const string& /*reference*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noLocal*/, + bool /*noAck*/, + bool /*exclusive*/, + const qpid::framing::FieldTable& /*filter*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noAck*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) + u_int64_t /*value*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) + const string& /*reference*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) + u_int32_t /*prefetchSize*/, + u_int16_t /*prefetchCount*/, + bool /*global*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) + bool /*requeue*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) + u_int16_t /*code*/, + const string& /*text*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool /*immediate*/, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } }} + |