summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ClientConnection.cpp')
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp52
1 files changed, 21 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index 4b8f32a26f..c998ec30df 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -46,11 +46,13 @@ const std::string Connection::OK("OK");
Connection::Connection(
bool _debug, uint32_t _max_frame_size,
framing::ProtocolVersion _version
-) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
+ ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
defaultConnector(version, _debug, _max_frame_size),
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
+
+ handler.maxFrameSize = _max_frame_size;
}
Connection::~Connection(){}
@@ -58,7 +60,7 @@ Connection::~Connection(){}
void Connection::setConnector(Connector& con)
{
connector = &con;
- connector->setInputHandler(this);
+ connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
out = connector->getOutputHandler();
@@ -70,10 +72,19 @@ void Connection::open(
{
if (isOpen)
THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+
+ //wire up the handler:
+ handler.in = boost::bind(&Connection::received, this, _1);
+ handler.out = boost::bind(&Connector::send, connector, _1);
+ handler.onClose = boost::bind(&Connection::closeChannels, this);
+
+ handler.uid = uid;
+ handler.pwd = pwd;
+ handler.vhost = vhost;
+
connector->connect(host, port);
- channels[0] = &channel0;
- channel0.open(0, *this);
- channel0.protocolInit(uid, pwd, vhost);
+ connector->init();
+ handler.waitForOpen();
isOpen = true;
}
@@ -87,14 +98,12 @@ void Connection::shutdown() {
}
void Connection::close(
- ReplyCode code, const string& msg, ClassId classId, MethodId methodId
+ ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/
)
{
if(markClosed()) {
try {
- channel0.sendAndReceive<ConnectionCloseOkBody>(
- make_shared_ptr(new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId)));
+ handler.close();
} catch (const std::exception& e) {
QPID_LOG(error, "Exception closing channel: " << e.what());
}
@@ -138,35 +147,16 @@ void Connection::erase(ChannelId id) {
void Connection::received(AMQFrame& frame){
ChannelId id = frame.getChannel();
Channel* channel = channels[id];
- if (channel == 0)
- THROW_QPID_ERROR(
- PROTOCOL_ERROR+504,
- (boost::format("Invalid channel number %g") % id).str());
- try{
- channel->getHandlers().in->handle(frame);
- }catch(const qpid::QpidError& e){
- std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl;
- channelException(
- *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
+ if (channel == 0) {
+ throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
}
+ channel->channelHandler.incoming(frame);
}
void Connection::send(AMQFrame& frame) {
out->send(frame);
}
-void Connection::channelException(
- Channel& channel, AMQMethodBody* method, const QpidError& e)
-{
- int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500;
- string msg = e.msg;
- if(method == 0)
- channel.close(code, msg);
- else
- channel.close(
- code, msg, method->amqpClassId(), method->amqpMethodId());
-}
-
void Connection::idleIn(){
connector->close();
}