summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientConnection.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-05 13:25:36 +0000
committerGordon Sim <gsim@apache.org>2007-08-05 13:25:36 +0000
commitb2efcb6ed3e1e2104836928cda81ed69f2f24559 (patch)
tree392ae403dcb0d32da3edaeaf8a1f497679d9102c /cpp/src/qpid/client/ClientConnection.cpp
parentb2fadec5d86e278d96112e915e67aec934e91046 (diff)
downloadqpid-python-b2efcb6ed3e1e2104836928cda81ed69f2f24559.tar.gz
Added first cut of generated client interface.
Old channel interface still supported; shares SessionCore with the new interface. Todo: allow applications to signal completion of received commands; keywrod args for interface. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientConnection.cpp')
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp128
1 files changed, 24 insertions, 104 deletions
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index c998ec30df..3ae1478152 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -41,31 +41,20 @@ using namespace qpid::sys;
namespace qpid {
namespace client {
-const std::string Connection::OK("OK");
-
-Connection::Connection(
- bool _debug, uint32_t _max_frame_size,
- framing::ProtocolVersion _version
- ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
- defaultConnector(version, _debug, _max_frame_size),
- isOpen(false), debug(_debug)
-{
- setConnector(defaultConnector);
-
- handler.maxFrameSize = _max_frame_size;
-}
+Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) :
+ channelIdCounter(0), version(_version),
+ max_frame_size(_max_frame_size),
+ impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))),
+ isOpen(false) {}
+
+Connection::Connection(boost::shared_ptr<Connector> c) :
+ channelIdCounter(0), version(framing::highestProtocolVersion),
+ max_frame_size(65536),
+ impl(new ConnectionImpl(c)),
+ isOpen(false) {}
Connection::~Connection(){}
-void Connection::setConnector(Connector& con)
-{
- connector = &con;
- connector->setInputHandler(&handler);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
- out = connector->getOutputHandler();
-}
-
void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd, const std::string& vhost)
@@ -73,97 +62,28 @@ void Connection::open(
if (isOpen)
THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
- //wire up the handler:
- handler.in = boost::bind(&Connection::received, this, _1);
- handler.out = boost::bind(&Connector::send, connector, _1);
- handler.onClose = boost::bind(&Connection::closeChannels, this);
-
- handler.uid = uid;
- handler.pwd = pwd;
- handler.vhost = vhost;
-
- connector->connect(host, port);
- connector->init();
- handler.waitForOpen();
+ impl->open(host, port, uid, pwd, vhost);
isOpen = true;
}
-void Connection::shutdown() {
- //this indicates that the socket to the server has closed we do
- //not want to send a close request (or any other requests)
- if(markClosed()) {
- QPID_LOG(info, "Connection to peer closed!");
- closeChannels();
- }
-}
-
-void Connection::close(
- ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/
-)
-{
- if(markClosed()) {
- try {
- handler.close();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception closing channel: " << e.what());
- }
- closeChannels();
- connector->close();
- }
-}
-
-bool Connection::markClosed()
-{
- Mutex::ScopedLock locker(shutdownLock);
- if (isOpen) {
- isOpen = false;
- return true;
- } else {
- return false;
- }
-}
-
-void Connection::closeChannels()
-{
- using boost::bind;
- for_each(channels.begin(), channels.end(),
- bind(&Channel::closeInternal,
- bind(&ChannelMap::value_type::second, _1)));
- channels.clear();
-}
-
void Connection::openChannel(Channel& channel) {
ChannelId id = ++channelIdCounter;
- assert (channels.find(id) == channels.end());
- assert(out);
- channels[id] = &channel;
- channel.open(id, *this);
-}
-
-void Connection::erase(ChannelId id) {
- channels.erase(id);
+ SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
+ impl->allocated(session);
+ channel.open(impl, session);
+ session->open();
}
-void Connection::received(AMQFrame& frame){
- ChannelId id = frame.getChannel();
- Channel* channel = channels[id];
- if (channel == 0) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- channel->channelHandler.incoming(frame);
-}
-
-void Connection::send(AMQFrame& frame) {
- out->send(frame);
-}
-
-void Connection::idleIn(){
- connector->close();
+Session Connection::newSession() {
+ ChannelId id = ++channelIdCounter;
+ SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
+ impl->allocated(session);
+ return Session(impl, session);
}
-void Connection::idleOut(){
- AMQFrame frame(version, 0, new AMQHeartbeatBody());
- out->send(frame);
+void Connection::close()
+{
+ impl->close();
}
}} // namespace qpid::client