diff options
author | Alan Conway <aconway@apache.org> | 2007-01-18 06:27:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-01-18 06:27:50 +0000 |
commit | 9e8a7c77a94a92c6cf92cf60be508817f0778040 (patch) | |
tree | 0ff1882596aae1da3fd6bd6c2b4e13a3b9ae5de7 /cpp/lib/broker/Connection.cpp | |
parent | 762cba1e7db15cb3c9e987a9edcbf3106c7244cb (diff) | |
download | qpid-python-9e8a7c77a94a92c6cf92cf60be508817f0778040.tar.gz |
There are a ton of FIXMES and request/response IDs are not yet working fully
but all tests are passing.
* broker::Broker: Removed requester/responder from broker.
* framing::BodyHandler: added Requester/Responder to BodyHandler, becomes
the base class for channel adapters in broker and client.
* broker::BrokerAdapter: Inherit BodyHandler, wraps a broker::Channel.
Hide private *HandlerImpl detail classes in BodyHandler.cpp.
* broker::Connection: Requester/Responder/Adapter now per-channel.
Connection channel map replaced with adapter map of BrokerAdapters.
handle* functions moved to BrokerAdapter.
All methods now handled by a BrokerAdapter for the relevant channel.
ChannelHandlerImpl is repsonsible for checking that
- No method on a non-0 channel is processed before open()
- Channel 0 methods only happen on channel 0 and similar for non-zero methods
Checks are not yet complete (see FIXMES)
* client::ResponseHandler: fix for client hang if broker crashs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Connection.cpp')
-rw-r--r-- | cpp/lib/broker/Connection.cpp | 135 |
1 files changed, 20 insertions, 115 deletions
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index c220fb9092..3db5bcd074 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -23,10 +23,6 @@ #include "Connection.h" -// TODO aconway 2007-01-16: move to channel. -#include "Requester.h" -#include "Responder.h" - using namespace boost; using namespace qpid::sys; using namespace qpid::framing; @@ -36,9 +32,6 @@ namespace qpid { namespace broker { Connection::Connection(SessionContext* context_, Broker& broker_) : - adapter(*this), - requester(broker.getRequester()), - responder(broker.getResponder()), context(context_), framemax(65536), heartbeat(0), @@ -65,89 +58,15 @@ Exchange::shared_ptr Connection::findExchange(const string& name){ return broker.getExchanges().get(name); } -void Connection::handleMethod( - u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) -{ - AMQMethodBody::shared_ptr method = - shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); - try{ - method->invoke(adapter, channel); - }catch(ChannelException& e){ - closeChannel(channel); - client->getChannel().close( - channel, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client->getConnection().close( - 0, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - client->getConnection().close( - 0, 541/*internal error*/, e.what(), - method->amqpClassId(), method->amqpMethodId()); - } -} void Connection::received(qpid::framing::AMQFrame* frame){ - u_int16_t channel = frame->getChannel(); - AMQBody::shared_ptr body = frame->getBody(); - switch(body->type()) - { - case REQUEST_BODY: - responder.received(AMQRequestBody::getData(body)); - handleMethod(channel, body); - break; - case RESPONSE_BODY: - // Must process responses before marking them received. - handleMethod(channel, body); - requester.processed(AMQResponseBody::getData(body)); - break; - // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. - case METHOD_BODY: - handleMethod(channel, body); - break; - case HEADER_BODY: - handleHeader( - channel, shared_polymorphic_cast<AMQHeaderBody>(body)); - break; - - case CONTENT_BODY: - handleContent( - channel, shared_polymorphic_cast<AMQContentBody>(body)); - break; - - case HEARTBEAT_BODY: - assert(channel == 0); - handleHeartbeat( - shared_polymorphic_cast<AMQHeartbeatBody>(body)); - break; - } -} - -/** - * An OutputHandler that does request/response procssing before - * delgating to another OutputHandler. - */ -Connection::Sender::Sender( - OutputHandler& oh, Requester& req, Responder& resp) - : out(oh), requester(req), responder(resp) -{} - -void Connection::Sender::send(AMQFrame* frame) { - AMQBody::shared_ptr body = frame->getBody(); - u_int16_t type = body->type(); - if (type == REQUEST_BODY) - requester.sending(AMQRequestBody::getData(body)); - else if (type == RESPONSE_BODY) - responder.sending(AMQResponseBody::getData(body)); - out.send(frame); + getAdapter(frame->getChannel()).handleBody(frame->getBody()); } void Connection::initiated(qpid::framing::ProtocolInitiation* header) { if (client.get()) - // TODO aconway 2007-01-16: correct code. + // TODO aconway 2007-01-16: correct error code. throw ConnectionException(0, "Connection initiated twice"); - client.reset(new qpid::framing::AMQP_ClientProxy( context, header->getMajor(), header->getMinor())); FieldTable properties; @@ -159,7 +78,6 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) { mechanisms, locales); } - void Connection::idleOut(){} void Connection::idleIn(){} @@ -177,42 +95,29 @@ void Connection::closed(){ } } -// TODO aconway 2007-01-16: colapse these. -void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel).handleHeader(body); -} - -void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel).handleContent(body); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - std::cout << "Connection::handleHeartbeat()" << std::endl; +void Connection::closeChannel(u_int16_t channel) { + getChannel(channel).close(); + adapters.erase(adapters.find(channel)); } -void Connection::openChannel(u_int16_t channel) { - if (channel == 0) - throw ConnectionException(504, "Illegal channel 0"); - if (channels.find(channel) != channels.end()) - throw ConnectionException(504, "Channel already open: " + channel); - channels.insert( - channel, - new Channel( - client->getProtocolVersion(), context, channel, framemax, - broker.getQueues().getStore(), settings.stagingThreshold)); -} -void Connection::closeChannel(u_int16_t channel) { - getChannel(channel).close(); // throws if channel does not exist. - channels.erase(channels.find(channel)); +BrokerAdapter& Connection::getAdapter(u_int16_t id) { + AdapterMap::iterator i = adapters.find(id); + if (i == adapters.end()) { + Channel* ch=new Channel( + client->getProtocolVersion(), context, id, + framemax, broker.getQueues().getStore(), + settings.stagingThreshold); + BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker); + adapters.insert(id, adapter); + return *adapter; + } + else + return *i; } - -Channel& Connection::getChannel(u_int16_t channel){ - ChannelMap::iterator i = channels.find(channel); - if(i == channels.end()) - throw ConnectionException(504, "Unknown channel: " + channel); - return *i; +Channel& Connection::getChannel(u_int16_t id) { + return getAdapter(id).getChannel(); } |