From d6901e52ab3ee9c40eddc4ad3b4787127c36d874 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 9 Oct 2008 19:36:51 +0000 Subject: Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703237 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ConnectionImpl.cpp | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp') diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index d9ac65c1b3..910c908ee2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -22,8 +22,10 @@ #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" +#include "FailoverListener.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" @@ -40,7 +42,9 @@ using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - version(v) + failover(new FailoverListener()), + version(v), + nextChannel(1) { QPID_LOG(debug, "ConnectionImpl created for " << version); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); @@ -56,12 +60,14 @@ ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - if (connector) connector->close(); + failover.reset(); + if (connector) connector->close(); } -void ConnectionImpl::addSession(const boost::shared_ptr& session) +void ConnectionImpl::addSession(const boost::shared_ptr& session, uint16_t channel) { Mutex::ScopedLock l(lock); + session->setChannel(channel ? channel : nextChannel++); boost::weak_ptr& s = sessions[session->getChannel()]; if (s.lock()) throw SessionBusyException(); s = session; @@ -102,7 +108,9 @@ void ConnectionImpl::open() connector->setShutdownHandler(this); connector->connect(host, port); connector->init(); - handler.waitForOpen(); + handler.waitForOpen(); + + if (failover.get()) failover->start(shared_from_this()); } void ConnectionImpl::idleIn() @@ -162,3 +170,16 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() return handler; } +std::vector ConnectionImpl::getKnownBrokers() { + // FIXME aconway 2008-10-08: initialize failover list from openOk or settings + return failover ? failover->getKnownBrokers() : std::vector(); +} + +boost::shared_ptr ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) { + boost::shared_ptr simpl(new SessionImpl(name, shared_from_this())); + addSession(simpl, channel); + simpl->open(timeout); + return simpl; +} + +void ConnectionImpl::stopFailoverListener() { failover.reset(); } -- cgit v1.2.1