diff options
author | Alan Conway <aconway@apache.org> | 2008-10-09 19:36:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-09 19:36:51 +0000 |
commit | d6901e52ab3ee9c40eddc4ad3b4787127c36d874 (patch) | |
tree | 85b9ba2e0d0922be150480392ec1b706a6df5cd0 /cpp/src/qpid/client/ConnectionImpl.cpp | |
parent | 016ae5acebab0eaf6dd70f5d4d653fdfee93925d (diff) | |
download | qpid-python-d6901e52ab3ee9c40eddc4ad3b4787127c36d874.tar.gz |
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 29 |
1 files changed, 25 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index d9ac65c1b3..910c908ee2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -22,8 +22,10 @@ #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" +#include "FailoverListener.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" @@ -40,7 +42,9 @@ using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - version(v) + failover(new FailoverListener()), + version(v), + nextChannel(1) { QPID_LOG(debug, "ConnectionImpl created for " << version); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); @@ -56,12 +60,14 @@ ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - if (connector) connector->close(); + failover.reset(); + if (connector) connector->close(); } -void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) +void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) { Mutex::ScopedLock l(lock); + session->setChannel(channel ? channel : nextChannel++); boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; if (s.lock()) throw SessionBusyException(); s = session; @@ -102,7 +108,9 @@ void ConnectionImpl::open() connector->setShutdownHandler(this); connector->connect(host, port); connector->init(); - handler.waitForOpen(); + handler.waitForOpen(); + + if (failover.get()) failover->start(shared_from_this()); } void ConnectionImpl::idleIn() @@ -162,3 +170,16 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() return handler; } +std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { + // FIXME aconway 2008-10-08: initialize failover list from openOk or settings + return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>(); +} + +boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) { + boost::shared_ptr<SessionImpl> simpl(new SessionImpl(name, shared_from_this())); + addSession(simpl, channel); + simpl->open(timeout); + return simpl; +} + +void ConnectionImpl::stopFailoverListener() { failover.reset(); } |