diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-05 13:25:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-05 13:25:36 +0000 |
commit | b2efcb6ed3e1e2104836928cda81ed69f2f24559 (patch) | |
tree | 392ae403dcb0d32da3edaeaf8a1f497679d9102c /cpp/src/qpid/client/ClientConnection.cpp | |
parent | b2fadec5d86e278d96112e915e67aec934e91046 (diff) | |
download | qpid-python-b2efcb6ed3e1e2104836928cda81ed69f2f24559.tar.gz |
Added first cut of generated client interface.
Old channel interface still supported; shares SessionCore with the new interface.
Todo: allow applications to signal completion of received commands; keywrod args for interface.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientConnection.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientConnection.cpp | 128 |
1 files changed, 24 insertions, 104 deletions
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index c998ec30df..3ae1478152 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -41,31 +41,20 @@ using namespace qpid::sys; namespace qpid { namespace client { -const std::string Connection::OK("OK"); - -Connection::Connection( - bool _debug, uint32_t _max_frame_size, - framing::ProtocolVersion _version - ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), - defaultConnector(version, _debug, _max_frame_size), - isOpen(false), debug(_debug) -{ - setConnector(defaultConnector); - - handler.maxFrameSize = _max_frame_size; -} +Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : + channelIdCounter(0), version(_version), + max_frame_size(_max_frame_size), + impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))), + isOpen(false) {} + +Connection::Connection(boost::shared_ptr<Connector> c) : + channelIdCounter(0), version(framing::highestProtocolVersion), + max_frame_size(65536), + impl(new ConnectionImpl(c)), + isOpen(false) {} Connection::~Connection(){} -void Connection::setConnector(Connector& con) -{ - connector = &con; - connector->setInputHandler(&handler); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); -} - void Connection::open( const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& vhost) @@ -73,97 +62,28 @@ void Connection::open( if (isOpen) THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); - //wire up the handler: - handler.in = boost::bind(&Connection::received, this, _1); - handler.out = boost::bind(&Connector::send, connector, _1); - handler.onClose = boost::bind(&Connection::closeChannels, this); - - handler.uid = uid; - handler.pwd = pwd; - handler.vhost = vhost; - - connector->connect(host, port); - connector->init(); - handler.waitForOpen(); + impl->open(host, port, uid, pwd, vhost); isOpen = true; } -void Connection::shutdown() { - //this indicates that the socket to the server has closed we do - //not want to send a close request (or any other requests) - if(markClosed()) { - QPID_LOG(info, "Connection to peer closed!"); - closeChannels(); - } -} - -void Connection::close( - ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/ -) -{ - if(markClosed()) { - try { - handler.close(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception closing channel: " << e.what()); - } - closeChannels(); - connector->close(); - } -} - -bool Connection::markClosed() -{ - Mutex::ScopedLock locker(shutdownLock); - if (isOpen) { - isOpen = false; - return true; - } else { - return false; - } -} - -void Connection::closeChannels() -{ - using boost::bind; - for_each(channels.begin(), channels.end(), - bind(&Channel::closeInternal, - bind(&ChannelMap::value_type::second, _1))); - channels.clear(); -} - void Connection::openChannel(Channel& channel) { ChannelId id = ++channelIdCounter; - assert (channels.find(id) == channels.end()); - assert(out); - channels[id] = &channel; - channel.open(id, *this); -} - -void Connection::erase(ChannelId id) { - channels.erase(id); + SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); + impl->allocated(session); + channel.open(impl, session); + session->open(); } -void Connection::received(AMQFrame& frame){ - ChannelId id = frame.getChannel(); - Channel* channel = channels[id]; - if (channel == 0) { - throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); - } - channel->channelHandler.incoming(frame); -} - -void Connection::send(AMQFrame& frame) { - out->send(frame); -} - -void Connection::idleIn(){ - connector->close(); +Session Connection::newSession() { + ChannelId id = ++channelIdCounter; + SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); + impl->allocated(session); + return Session(impl, session); } -void Connection::idleOut(){ - AMQFrame frame(version, 0, new AMQHeartbeatBody()); - out->send(frame); +void Connection::close() +{ + impl->close(); } }} // namespace qpid::client |