summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-18 06:27:50 +0000
committerAlan Conway <aconway@apache.org>2007-01-18 06:27:50 +0000
commit9e8a7c77a94a92c6cf92cf60be508817f0778040 (patch)
tree0ff1882596aae1da3fd6bd6c2b4e13a3b9ae5de7 /cpp/lib/broker/Connection.cpp
parent762cba1e7db15cb3c9e987a9edcbf3106c7244cb (diff)
downloadqpid-python-9e8a7c77a94a92c6cf92cf60be508817f0778040.tar.gz
There are a ton of FIXMES and request/response IDs are not yet working fully
but all tests are passing. * broker::Broker: Removed requester/responder from broker. * framing::BodyHandler: added Requester/Responder to BodyHandler, becomes the base class for channel adapters in broker and client. * broker::BrokerAdapter: Inherit BodyHandler, wraps a broker::Channel. Hide private *HandlerImpl detail classes in BodyHandler.cpp. * broker::Connection: Requester/Responder/Adapter now per-channel. Connection channel map replaced with adapter map of BrokerAdapters. handle* functions moved to BrokerAdapter. All methods now handled by a BrokerAdapter for the relevant channel. ChannelHandlerImpl is repsonsible for checking that - No method on a non-0 channel is processed before open() - Channel 0 methods only happen on channel 0 and similar for non-zero methods Checks are not yet complete (see FIXMES) * client::ResponseHandler: fix for client hang if broker crashs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497319 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Connection.cpp')
-rw-r--r--cpp/lib/broker/Connection.cpp135
1 files changed, 20 insertions, 115 deletions
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index c220fb9092..3db5bcd074 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -23,10 +23,6 @@
#include "Connection.h"
-// TODO aconway 2007-01-16: move to channel.
-#include "Requester.h"
-#include "Responder.h"
-
using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -36,9 +32,6 @@ namespace qpid {
namespace broker {
Connection::Connection(SessionContext* context_, Broker& broker_) :
- adapter(*this),
- requester(broker.getRequester()),
- responder(broker.getResponder()),
context(context_),
framemax(65536),
heartbeat(0),
@@ -65,89 +58,15 @@ Exchange::shared_ptr Connection::findExchange(const string& name){
return broker.getExchanges().get(name);
}
-void Connection::handleMethod(
- u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
-{
- AMQMethodBody::shared_ptr method =
- shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
- try{
- method->invoke(adapter, channel);
- }catch(ChannelException& e){
- closeChannel(channel);
- client->getChannel().close(
- channel, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(ConnectionException& e){
- client->getConnection().close(
- 0, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- client->getConnection().close(
- 0, 541/*internal error*/, e.what(),
- method->amqpClassId(), method->amqpMethodId());
- }
-}
void Connection::received(qpid::framing::AMQFrame* frame){
- u_int16_t channel = frame->getChannel();
- AMQBody::shared_ptr body = frame->getBody();
- switch(body->type())
- {
- case REQUEST_BODY:
- responder.received(AMQRequestBody::getData(body));
- handleMethod(channel, body);
- break;
- case RESPONSE_BODY:
- // Must process responses before marking them received.
- handleMethod(channel, body);
- requester.processed(AMQResponseBody::getData(body));
- break;
- // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
- case METHOD_BODY:
- handleMethod(channel, body);
- break;
- case HEADER_BODY:
- handleHeader(
- channel, shared_polymorphic_cast<AMQHeaderBody>(body));
- break;
-
- case CONTENT_BODY:
- handleContent(
- channel, shared_polymorphic_cast<AMQContentBody>(body));
- break;
-
- case HEARTBEAT_BODY:
- assert(channel == 0);
- handleHeartbeat(
- shared_polymorphic_cast<AMQHeartbeatBody>(body));
- break;
- }
-}
-
-/**
- * An OutputHandler that does request/response procssing before
- * delgating to another OutputHandler.
- */
-Connection::Sender::Sender(
- OutputHandler& oh, Requester& req, Responder& resp)
- : out(oh), requester(req), responder(resp)
-{}
-
-void Connection::Sender::send(AMQFrame* frame) {
- AMQBody::shared_ptr body = frame->getBody();
- u_int16_t type = body->type();
- if (type == REQUEST_BODY)
- requester.sending(AMQRequestBody::getData(body));
- else if (type == RESPONSE_BODY)
- responder.sending(AMQResponseBody::getData(body));
- out.send(frame);
+ getAdapter(frame->getChannel()).handleBody(frame->getBody());
}
void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
if (client.get())
- // TODO aconway 2007-01-16: correct code.
+ // TODO aconway 2007-01-16: correct error code.
throw ConnectionException(0, "Connection initiated twice");
-
client.reset(new qpid::framing::AMQP_ClientProxy(
context, header->getMajor(), header->getMinor()));
FieldTable properties;
@@ -159,7 +78,6 @@ void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
mechanisms, locales);
}
-
void Connection::idleOut(){}
void Connection::idleIn(){}
@@ -177,42 +95,29 @@ void Connection::closed(){
}
}
-// TODO aconway 2007-01-16: colapse these.
-void Connection::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel).handleHeader(body);
-}
-
-void Connection::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel).handleContent(body);
-}
-
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- std::cout << "Connection::handleHeartbeat()" << std::endl;
+void Connection::closeChannel(u_int16_t channel) {
+ getChannel(channel).close();
+ adapters.erase(adapters.find(channel));
}
-void Connection::openChannel(u_int16_t channel) {
- if (channel == 0)
- throw ConnectionException(504, "Illegal channel 0");
- if (channels.find(channel) != channels.end())
- throw ConnectionException(504, "Channel already open: " + channel);
- channels.insert(
- channel,
- new Channel(
- client->getProtocolVersion(), context, channel, framemax,
- broker.getQueues().getStore(), settings.stagingThreshold));
-}
-void Connection::closeChannel(u_int16_t channel) {
- getChannel(channel).close(); // throws if channel does not exist.
- channels.erase(channels.find(channel));
+BrokerAdapter& Connection::getAdapter(u_int16_t id) {
+ AdapterMap::iterator i = adapters.find(id);
+ if (i == adapters.end()) {
+ Channel* ch=new Channel(
+ client->getProtocolVersion(), context, id,
+ framemax, broker.getQueues().getStore(),
+ settings.stagingThreshold);
+ BrokerAdapter* adapter = new BrokerAdapter(ch, *this, broker);
+ adapters.insert(id, adapter);
+ return *adapter;
+ }
+ else
+ return *i;
}
-
-Channel& Connection::getChannel(u_int16_t channel){
- ChannelMap::iterator i = channels.find(channel);
- if(i == channels.end())
- throw ConnectionException(504, "Unknown channel: " + channel);
- return *i;
+Channel& Connection::getChannel(u_int16_t id) {
+ return getAdapter(id).getChannel();
}