summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/SessionHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/SessionHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp257
1 files changed, 150 insertions, 107 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index 905ac83b92..d7f6320535 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -19,11 +19,15 @@
*
*/
#include <iostream>
-#include <SessionHandlerImpl.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <TopicExchange.h>
-#include "assert.h"
+#include <assert.h>
+
+#include "SessionHandlerImpl.h"
+
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+
+#include "Requester.h"
+#include "Responder.h"
using namespace boost;
using namespace qpid::sys;
@@ -42,6 +46,8 @@ SessionHandlerImpl::SessionHandlerImpl(
exchanges(broker.getExchanges()),
cleaner(broker.getCleaner()),
settings(broker.getTimeout(), broker.getStagingThreshold()),
+ requester(broker.getRequester()),
+ responder(broker.getResponder()),
basicHandler(new BasicHandlerImpl(this)),
channelHandler(new ChannelHandlerImpl(this)),
connectionHandler(new ConnectionHandlerImpl(this)),
@@ -55,7 +61,7 @@ SessionHandlerImpl::SessionHandlerImpl(
SessionHandlerImpl::~SessionHandlerImpl(){
- if (client != NULL)
+ if (client != NULL)
delete client;
}
@@ -87,51 +93,87 @@ Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
return exchanges.get(name);
}
+void SessionHandlerImpl::handleMethod(
+ u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
+{
+ AMQMethodBody::shared_ptr method =
+ shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ client->getChannel().close(
+ channel, e.code, e.text,
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ client->getConnection().close(
+ 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ client->getConnection().close(
+ 0, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
u_int16_t channel = frame->getChannel();
AMQBody::shared_ptr body = frame->getBody();
- AMQMethodBody::shared_ptr method;
-
switch(body->type())
{
case REQUEST_BODY:
- // responder.received(frame);
+ responder.received(AMQRequestBody::getData(body));
+ handleMethod(channel, body);
+ break;
case RESPONSE_BODY:
- // requester.received(frame);
- case METHOD_BODY: //
- method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
- try{
- method->invoke(*this, channel);
- }catch(ChannelException& e){
- channels[channel]->close();
- channels.erase(channel);
- client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }catch(ConnectionException& e){
- client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- string error(e.what());
- client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId());
- }
- break;
-
- case HEADER_BODY:
- this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ // Must process responses before marking them received.
+ handleMethod(channel, body);
+ requester.processed(AMQResponseBody::getData(body));
+ break;
+ // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
+ case METHOD_BODY:
+ handleMethod(channel, body);
+ break;
+ case HEADER_BODY:
+ handleHeader(
+ channel, shared_polymorphic_cast<AMQHeaderBody>(body));
break;
- case CONTENT_BODY:
- this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ case CONTENT_BODY:
+ handleContent(
+ channel, shared_polymorphic_cast<AMQContentBody>(body));
break;
- case HEARTBEAT_BODY:
- //channel must be 0
- this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ case HEARTBEAT_BODY:
+ assert(channel == 0);
+ handleHeartbeat(
+ shared_polymorphic_cast<AMQHeartbeatBody>(body));
break;
}
}
+/**
+ * An OutputHandler that does request/response procssing before
+ * delgating to another OutputHandler.
+ */
+SessionHandlerImpl::Sender::Sender(
+ OutputHandler& oh, Requester& req, Responder& resp)
+ : out(oh), requester(req), responder(resp)
+{}
+
+void SessionHandlerImpl::Sender::send(AMQFrame* frame) {
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int16_t type = body->type();
+ if (type == REQUEST_BODY)
+ requester.sending(AMQRequestBody::getData(body));
+ else if (type == RESPONSE_BODY)
+ responder.sending(AMQResponseBody::getData(body));
+ out.send(frame);
+}
+
void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
- if (client == NULL)
+ if (client == 0)
{
client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor());
@@ -280,7 +322,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind(
const string& /*routingKey*/,
const qpid::framing::FieldTable& /*arguments*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
@@ -335,9 +377,9 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
if(exchange){
-// kpvdr - cannot use this any longer as routingKey is now const
-// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
-// exchange->bind(queue, routingKey, &arguments);
+ // kpvdr - cannot use this any longer as routingKey is now const
+ // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+ // exchange->bind(queue, routingKey, &arguments);
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
if(!nowait) parent->client->getQueue().bindOk(channel);
@@ -483,25 +525,25 @@ SessionHandlerImpl::QueueHandlerImpl::unbind(
const string& /*routingKey*/,
const qpid::framing::FieldTable& /*arguments*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
@@ -509,148 +551,149 @@ SessionHandlerImpl::ChannelHandlerImpl::resume(
u_int16_t /*channel*/,
const string& /*channelId*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
// Message class method handlers
void
SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
+ const string& /*reference*/,
+ const string& /*bytes*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
- const string& /*destination*/ )
+ const string& /*destination*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noLocal*/,
- bool /*noAck*/,
- bool /*exclusive*/,
- const qpid::framing::FieldTable& /*filter*/ )
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noLocal*/,
+ bool /*noAck*/,
+ bool /*exclusive*/,
+ const qpid::framing::FieldTable& /*filter*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noAck*/ )
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
+ u_int64_t /*value*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/,
- u_int32_t /*prefetchSize*/,
- u_int16_t /*prefetchCount*/,
- bool /*global*/ )
+ u_int32_t /*prefetchSize*/,
+ u_int16_t /*prefetchCount*/,
+ bool /*global*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/,
- bool /*requeue*/ )
+ bool /*requeue*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
+ u_int16_t /*code*/,
+ const string& /*text*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool /*immediate*/,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
+ u_int16_t /*ticket*/,
+ const string& /*destination*/,
+ bool /*redelivered*/,
+ bool /*immediate*/,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content /*body*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
}}
+