diff options
Diffstat (limited to 'cpp/src/qpid/client/ClientConnection.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientConnection.cpp | 52 |
1 files changed, 21 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index 4b8f32a26f..c998ec30df 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -46,11 +46,13 @@ const std::string Connection::OK("OK"); Connection::Connection( bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version -) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), + ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) { setConnector(defaultConnector); + + handler.maxFrameSize = _max_frame_size; } Connection::~Connection(){} @@ -58,7 +60,7 @@ Connection::~Connection(){} void Connection::setConnector(Connector& con) { connector = &con; - connector->setInputHandler(this); + connector->setInputHandler(&handler); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); @@ -70,10 +72,19 @@ void Connection::open( { if (isOpen) THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + + //wire up the handler: + handler.in = boost::bind(&Connection::received, this, _1); + handler.out = boost::bind(&Connector::send, connector, _1); + handler.onClose = boost::bind(&Connection::closeChannels, this); + + handler.uid = uid; + handler.pwd = pwd; + handler.vhost = vhost; + connector->connect(host, port); - channels[0] = &channel0; - channel0.open(0, *this); - channel0.protocolInit(uid, pwd, vhost); + connector->init(); + handler.waitForOpen(); isOpen = true; } @@ -87,14 +98,12 @@ void Connection::shutdown() { } void Connection::close( - ReplyCode code, const string& msg, ClassId classId, MethodId methodId + ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/ ) { if(markClosed()) { try { - channel0.sendAndReceive<ConnectionCloseOkBody>( - make_shared_ptr(new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId))); + handler.close(); } catch (const std::exception& e) { QPID_LOG(error, "Exception closing channel: " << e.what()); } @@ -138,35 +147,16 @@ void Connection::erase(ChannelId id) { void Connection::received(AMQFrame& frame){ ChannelId id = frame.getChannel(); Channel* channel = channels[id]; - if (channel == 0) - THROW_QPID_ERROR( - PROTOCOL_ERROR+504, - (boost::format("Invalid channel number %g") % id).str()); - try{ - channel->getHandlers().in->handle(frame); - }catch(const qpid::QpidError& e){ - std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl; - channelException( - *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e); + if (channel == 0) { + throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); } + channel->channelHandler.incoming(frame); } void Connection::send(AMQFrame& frame) { out->send(frame); } -void Connection::channelException( - Channel& channel, AMQMethodBody* method, const QpidError& e) -{ - int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0) - channel.close(code, msg); - else - channel.close( - code, msg, method->amqpClassId(), method->amqpMethodId()); -} - void Connection::idleIn(){ connector->close(); } |