summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ConnectionImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
committerAlan Conway <aconway@apache.org>2008-10-09 19:36:51 +0000
commitd6901e52ab3ee9c40eddc4ad3b4787127c36d874 (patch)
tree85b9ba2e0d0922be150480392ec1b706a6df5cd0 /cpp/src/qpid/client/ConnectionImpl.cpp
parent016ae5acebab0eaf6dd70f5d4d653fdfee93925d (diff)
downloadqpid-python-d6901e52ab3ee9c40eddc4ad3b4787127c36d874.tar.gz
Client-side support for amq.faiover exchange. Connection::getKnownBrokers provides latest list.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp29
1 files changed, 25 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index d9ac65c1b3..910c908ee2 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -22,8 +22,10 @@
#include "Connector.h"
#include "ConnectionSettings.h"
#include "SessionImpl.h"
+#include "FailoverListener.h"
#include "qpid/log/Statement.h"
+#include "qpid/Url.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
@@ -40,7 +42,9 @@ using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
- version(v)
+ failover(new FailoverListener()),
+ version(v),
+ nextChannel(1)
{
QPID_LOG(debug, "ConnectionImpl created for " << version);
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
@@ -56,12 +60,14 @@ ConnectionImpl::~ConnectionImpl() {
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- if (connector) connector->close();
+ failover.reset();
+ if (connector) connector->close();
}
-void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
+void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)
{
Mutex::ScopedLock l(lock);
+ session->setChannel(channel ? channel : nextChannel++);
boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
if (s.lock()) throw SessionBusyException();
s = session;
@@ -102,7 +108,9 @@ void ConnectionImpl::open()
connector->setShutdownHandler(this);
connector->connect(host, port);
connector->init();
- handler.waitForOpen();
+ handler.waitForOpen();
+
+ if (failover.get()) failover->start(shared_from_this());
}
void ConnectionImpl::idleIn()
@@ -162,3 +170,16 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
return handler;
}
+std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
+ // FIXME aconway 2008-10-08: initialize failover list from openOk or settings
+ return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>();
+}
+
+boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) {
+ boost::shared_ptr<SessionImpl> simpl(new SessionImpl(name, shared_from_this()));
+ addSession(simpl, channel);
+ simpl->open(timeout);
+ return simpl;
+}
+
+void ConnectionImpl::stopFailoverListener() { failover.reset(); }