summaryrefslogtreecommitdiff
path: root/cpp/lib/client/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/client/Connection.cpp')
-rw-r--r--cpp/lib/client/Connection.cpp60
1 files changed, 33 insertions, 27 deletions
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 19d5cce7db..bf6c44570d 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -27,6 +27,8 @@
#include <iostream>
#include <sstream>
#include <MethodBodyInstances.h>
+#include <boost/bind.hpp>
+#include <functional>
using namespace qpid::framing;
using namespace qpid::sys;
@@ -41,45 +43,59 @@ ChannelId Connection::channelIdCounter;
const std::string Connection::OK("OK");
Connection::Connection(
- bool debug, u_int32_t _max_frame_size,
+ bool _debug, u_int32_t _max_frame_size,
const framing::ProtocolVersion& _version
-) : max_frame_size(_max_frame_size), closed(true),
- version(_version)
-{
- connector = new Connector(version, debug, _max_frame_size);
-}
+) : version(_version), max_frame_size(_max_frame_size),
+ defaultConnector(version, debug, max_frame_size),
+ connector(&defaultConnector),
+ isOpen(false), debug(_debug)
+{}
Connection::~Connection(){
- delete connector;
+ close();
}
-void Connection::open(
- const std::string& _host, int _port, const std::string& uid,
- const std::string& pwd, const std::string& virtualhost)
+void Connection::setConnector(Connector& con)
{
-
- host = _host;
- port = _port;
+ connector = &con;
connector->setInputHandler(this);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
out = connector->getOutputHandler();
+}
+
+void Connection::open(
+ const std::string& host, int port,
+ const std::string& uid, const std::string& pwd, const std::string& vhost)
+{
+ if (isOpen)
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
connector->connect(host, port);
-
- // Open the special channel 0.
channels[0] = &channel0;
channel0.open(0, *this);
- channel0.protocolInit(uid, pwd, virtualhost);
+ channel0.protocolInit(uid, pwd, vhost);
+ isOpen = true;
}
+void Connection::shutdown() {
+ close();
+}
+
void Connection::close(
ReplyCode code, const string& msg, ClassId classId, MethodId methodId
)
{
- if(!closed) {
+ if(isOpen) {
+ // TODO aconway 2007-01-29: Exception handling - could end up
+ // partly closed.
+ isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
new ConnectionCloseBody(
getVersion(), code, msg, classId, methodId));
+ while(!channels.empty()) {
+ channels.begin()->second->close();
+ channels.erase(channels.begin());
+ }
connector->close();
}
}
@@ -140,14 +156,4 @@ void Connection::idleOut(){
out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
}
-void Connection::shutdown(){
- closed = true;
- //close all channels, also removes them from the map.
- while(!channels.empty()){
- Channel* channel = channels.begin()->second;
- if (channel != 0)
- channel->close();
- }
-}
-
}} // namespace qpid::client