From 98ccae7574a18f8d0a1f9e28e86ccfde4541c81f Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 30 Jan 2007 18:20:00 +0000 Subject: * client/* framing/*: fixed client-side request ID processing. * cpp/tests/InProcessBroker.h: For tests: connect to an in-process broker directly, bypass the network. Keeps log of client/broker conversation for verification in test code. * cpp/tests/FramingTest.cpp (testRequestResponseRoundtrip): Client/broker round-trip test for request/reponse IDs and response mark. * APRAcceptor.cpp (APRAcceptor): fixed valgrind uninitialized error. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501502 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/lib/client/Connection.cpp | 60 ++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 27 deletions(-) (limited to 'cpp/lib/client/Connection.cpp') diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 19d5cce7db..bf6c44570d 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -27,6 +27,8 @@ #include #include #include +#include +#include using namespace qpid::framing; using namespace qpid::sys; @@ -41,45 +43,59 @@ ChannelId Connection::channelIdCounter; const std::string Connection::OK("OK"); Connection::Connection( - bool debug, u_int32_t _max_frame_size, + bool _debug, u_int32_t _max_frame_size, const framing::ProtocolVersion& _version -) : max_frame_size(_max_frame_size), closed(true), - version(_version) -{ - connector = new Connector(version, debug, _max_frame_size); -} +) : version(_version), max_frame_size(_max_frame_size), + defaultConnector(version, debug, max_frame_size), + connector(&defaultConnector), + isOpen(false), debug(_debug) +{} Connection::~Connection(){ - delete connector; + close(); } -void Connection::open( - const std::string& _host, int _port, const std::string& uid, - const std::string& pwd, const std::string& virtualhost) +void Connection::setConnector(Connector& con) { - - host = _host; - port = _port; + connector = &con; connector->setInputHandler(this); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); +} + +void Connection::open( + const std::string& host, int port, + const std::string& uid, const std::string& pwd, const std::string& vhost) +{ + if (isOpen) + THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); connector->connect(host, port); - - // Open the special channel 0. channels[0] = &channel0; channel0.open(0, *this); - channel0.protocolInit(uid, pwd, virtualhost); + channel0.protocolInit(uid, pwd, vhost); + isOpen = true; } +void Connection::shutdown() { + close(); +} + void Connection::close( ReplyCode code, const string& msg, ClassId classId, MethodId methodId ) { - if(!closed) { + if(isOpen) { + // TODO aconway 2007-01-29: Exception handling - could end up + // partly closed. + isOpen = false; channel0.sendAndReceive( new ConnectionCloseBody( getVersion(), code, msg, classId, methodId)); + while(!channels.empty()) { + channels.begin()->second->close(); + channels.erase(channels.begin()); + } connector->close(); } } @@ -140,14 +156,4 @@ void Connection::idleOut(){ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); } -void Connection::shutdown(){ - closed = true; - //close all channels, also removes them from the map. - while(!channels.empty()){ - Channel* channel = channels.begin()->second; - if (channel != 0) - channel->close(); - } -} - }} // namespace qpid::client -- cgit v1.2.1