diff options
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 29 |
1 files changed, 25 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index d9ac65c1b3..910c908ee2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -22,8 +22,10 @@ #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" +#include "FailoverListener.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" @@ -40,7 +42,9 @@ using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - version(v) + failover(new FailoverListener()), + version(v), + nextChannel(1) { QPID_LOG(debug, "ConnectionImpl created for " << version); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); @@ -56,12 +60,14 @@ ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - if (connector) connector->close(); + failover.reset(); + if (connector) connector->close(); } -void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) +void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) { Mutex::ScopedLock l(lock); + session->setChannel(channel ? channel : nextChannel++); boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; if (s.lock()) throw SessionBusyException(); s = session; @@ -102,7 +108,9 @@ void ConnectionImpl::open() connector->setShutdownHandler(this); connector->connect(host, port); connector->init(); - handler.waitForOpen(); + handler.waitForOpen(); + + if (failover.get()) failover->start(shared_from_this()); } void ConnectionImpl::idleIn() @@ -162,3 +170,16 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() return handler; } +std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { + // FIXME aconway 2008-10-08: initialize failover list from openOk or settings + return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>(); +} + +boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) { + boost::shared_ptr<SessionImpl> simpl(new SessionImpl(name, shared_from_this())); + addSession(simpl, channel); + simpl->open(timeout); + return simpl; +} + +void ConnectionImpl::stopFailoverListener() { failover.reset(); } |