diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-02 18:09:48 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-02 18:09:48 +0000 |
commit | 89aa36d093182e9e191c000504c174663932458f (patch) | |
tree | 06d7e9a3feb4abdaab74b79c94e4352dfa40adaa /cpp/src/qpid/client/ClientConnection.cpp | |
parent | 2290d4ed915f1202bcd6cd50b1a85f27f3eb6cd2 (diff) | |
download | qpid-python-89aa36d093182e9e191c000504c174663932458f.tar.gz |
Some restructuring of the client code:
* Introduced three separate 'handlers' for the connection, channel and execution 'layers'.
* Support for asynchronous retrieval of response or completion status.
* Channel methods no longer included in execution layers command id count.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562212 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientConnection.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientConnection.cpp | 52 |
1 files changed, 21 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index 4b8f32a26f..c998ec30df 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -46,11 +46,13 @@ const std::string Connection::OK("OK"); Connection::Connection( bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version -) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), + ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) { setConnector(defaultConnector); + + handler.maxFrameSize = _max_frame_size; } Connection::~Connection(){} @@ -58,7 +60,7 @@ Connection::~Connection(){} void Connection::setConnector(Connector& con) { connector = &con; - connector->setInputHandler(this); + connector->setInputHandler(&handler); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); @@ -70,10 +72,19 @@ void Connection::open( { if (isOpen) THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + + //wire up the handler: + handler.in = boost::bind(&Connection::received, this, _1); + handler.out = boost::bind(&Connector::send, connector, _1); + handler.onClose = boost::bind(&Connection::closeChannels, this); + + handler.uid = uid; + handler.pwd = pwd; + handler.vhost = vhost; + connector->connect(host, port); - channels[0] = &channel0; - channel0.open(0, *this); - channel0.protocolInit(uid, pwd, vhost); + connector->init(); + handler.waitForOpen(); isOpen = true; } @@ -87,14 +98,12 @@ void Connection::shutdown() { } void Connection::close( - ReplyCode code, const string& msg, ClassId classId, MethodId methodId + ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/ ) { if(markClosed()) { try { - channel0.sendAndReceive<ConnectionCloseOkBody>( - make_shared_ptr(new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId))); + handler.close(); } catch (const std::exception& e) { QPID_LOG(error, "Exception closing channel: " << e.what()); } @@ -138,35 +147,16 @@ void Connection::erase(ChannelId id) { void Connection::received(AMQFrame& frame){ ChannelId id = frame.getChannel(); Channel* channel = channels[id]; - if (channel == 0) - THROW_QPID_ERROR( - PROTOCOL_ERROR+504, - (boost::format("Invalid channel number %g") % id).str()); - try{ - channel->getHandlers().in->handle(frame); - }catch(const qpid::QpidError& e){ - std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl; - channelException( - *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e); + if (channel == 0) { + throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); } + channel->channelHandler.incoming(frame); } void Connection::send(AMQFrame& frame) { out->send(frame); } -void Connection::channelException( - Channel& channel, AMQMethodBody* method, const QpidError& e) -{ - int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0) - channel.close(code, msg); - else - channel.close( - code, msg, method->amqpClassId(), method->amqpMethodId()); -} - void Connection::idleIn(){ connector->close(); } |