summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-09-06 19:37:52 +0000
committerGordon Sim <gsim@apache.org>2013-09-06 19:37:52 +0000
commit9f89516b7a87b0a60abc665c4d83ae7daecbc8ad (patch)
treef8fdb59196e37d70fe902244afa28e5a442abd6d
parente1739e9a6f90a8029a099806e3778205f4f6e8d2 (diff)
downloadqpid-python-9f89516b7a87b0a60abc665c4d83ae7daecbc8ad.tar.gz
QPID-4932: expose reconnect&replay logic for application to control itself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1520673 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/messaging/CMakeLists.txt2
-rw-r--r--qpid/cpp/examples/messaging/server_reconnect.cpp97
-rw-r--r--qpid/cpp/include/qpid/messaging/Connection.h27
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp47
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h5
-rw-r--r--qpid/cpp/src/qpid/messaging/Connection.cpp14
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionImpl.h3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp96
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h8
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp13
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h3
13 files changed, 281 insertions, 38 deletions
diff --git a/qpid/cpp/examples/messaging/CMakeLists.txt b/qpid/cpp/examples/messaging/CMakeLists.txt
index 6b2a06ae3a..5481f2f0bf 100644
--- a/qpid/cpp/examples/messaging/CMakeLists.txt
+++ b/qpid/cpp/examples/messaging/CMakeLists.txt
@@ -49,6 +49,7 @@ add_messaging_example(map_sender)
add_messaging_example(client)
add_messaging_example(server)
+add_messaging_example(server_reconnect)
# These don't need Boost or OptionParser
add_executable(hello_world hello_world.cpp)
@@ -74,6 +75,7 @@ install (FILES
${CMAKE_CURRENT_SOURCE_DIR}/map_sender.cpp
${CMAKE_CURRENT_SOURCE_DIR}/client.cpp
${CMAKE_CURRENT_SOURCE_DIR}/server.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/server_reconnect.cpp
DESTINATION ${QPID_INSTALL_EXAMPLESDIR}/messaging
COMPONENT ${QPID_COMPONENT_EXAMPLES})
diff --git a/qpid/cpp/examples/messaging/server_reconnect.cpp b/qpid/cpp/examples/messaging/server_reconnect.cpp
new file mode 100644
index 0000000000..ab7147760f
--- /dev/null
+++ b/qpid/cpp/examples/messaging/server_reconnect.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Address.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/exceptions.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <sstream>
+
+using namespace qpid::messaging;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+ std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
+ std::string connectionOptions = argc > 2 ? argv[2] : "";
+
+ Connection connection(url, connectionOptions);
+ try {
+ connection.open();
+ Session session = connection.createSession();
+ Receiver receiver = session.createReceiver("service_queue; {create: always}");
+
+ while (true) {
+ try {
+ if (!connection.isOpen()) {
+ // This demonstrates use of application controlled
+ // reconnect; the reconnect connection option may
+ // also be used to automatcially handle
+ // reconnection
+ if (url.empty()) {
+ connection.reconnect();
+ } else {
+ connection.reconnect(url);
+ }
+ std::cout << "Reconnected to " << connection.getUrl() << std::endl;
+ }
+ Message request = receiver.fetch();
+ const Address& address = request.getReplyTo();
+ if (address) {
+ Sender sender = session.createSender(address);
+ std::string s = request.getContent();
+ std::transform(s.begin(), s.end(), s.begin(), toupper);
+ Message response(s);
+ sender.send(response);
+ std::cout << "Processed request: "
+ << request.getContent()
+ << " -> "
+ << response.getContent() << std::endl;
+ session.acknowledge();
+ sender.close();
+ } else {
+ std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl;
+ session.reject(request);
+ }
+ } catch (const TransportFailure&) {
+ std::cout << "Connection to broker was lost, please enter URL to reconnect to (or hit return to use original url):" << std::endl;
+ if (!std::getline(std::cin, url)) {
+ return 1;
+ }
+ }
+ }
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ connection.close();
+ }
+ return 1;
+}
diff --git a/qpid/cpp/include/qpid/messaging/Connection.h b/qpid/cpp/include/qpid/messaging/Connection.h
index 58e61c4655..bb83685731 100644
--- a/qpid/cpp/include/qpid/messaging/Connection.h
+++ b/qpid/cpp/include/qpid/messaging/Connection.h
@@ -98,6 +98,33 @@ class QPID_MESSAGING_CLASS_EXTERN Connection : public qpid::messaging::Handle<Co
QPID_MESSAGING_EXTERN void open();
QPID_MESSAGING_EXTERN bool isOpen();
QPID_MESSAGING_EXTERN bool isOpen() const;
+
+ /**
+ * Attempts to reconnect to the specified url, re-establish
+ * existing sessions, senders and receivers and resend any indoubt
+ * messages.
+ *
+ * This can be used to directly control reconnect behaviour rather
+ * than using the reconnect option for automatically handling
+ * that.
+ */
+ QPID_MESSAGING_EXTERN void reconnect(const std::string& url);
+ /**
+ * Attempts to reconnect to the original url, including any
+ * specified reconnect_urls, re-establish existing sessions,
+ * senders and receivers and resend any indoubt messages.
+ *
+ * This can be used to directly control reconnect behaviour rather
+ * than using the reconnect option for automatically handling
+ * that.
+ */
+ QPID_MESSAGING_EXTERN void reconnect();
+ /**
+ * returns a url reprsenting the broker the client is currently
+ * connected to (or an e,pty string if it is not connected).
+ */
+ QPID_MESSAGING_EXTERN std::string getUrl() const;
+
/**
* Closes a connection and all sessions associated with it. An
* opened connection must be closed before the last handle is
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index 8b4eafccaa..26e69233af 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -127,7 +127,7 @@ void Connection::open(const ConnectionSettings& settings)
impl->registerFailureCallback ( failureCallback );
}
-const ConnectionSettings& Connection::getNegotiatedSettings()
+const ConnectionSettings& Connection::getNegotiatedSettings() const
{
if (!isOpen())
throw Exception(QPID_MSG("Connection is not open."));
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index c0db0f301d..fb502cb40a 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -216,7 +216,7 @@ class QPID_CLIENT_CLASS_EXTERN Connection
/**
* Return the set of client negotiated settings
*/
- QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings();
+ QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings() const;
friend struct ConnectionAccess; ///<@internal
friend class SessionBase_0_10; ///<@internal
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 366218c002..9406c992fe 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -43,6 +43,7 @@ using qpid::framing::Uuid;
namespace {
const std::string TCP("tcp");
+const std::string COLON(":");
double FOREVER(std::numeric_limits<double>::max());
// Time values in seconds can be specified as integer or floating point values.
@@ -86,7 +87,7 @@ bool expired(const sys::AbsTime& start, double timeout)
} // namespace
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
- replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1),
+ replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1),
minReconnectInterval(0.001), maxReconnectInterval(2),
retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false)
{
@@ -106,7 +107,7 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
{
sys::Mutex::ScopedLock l(lock);
if (name == "reconnect") {
- reconnect = value;
+ autoReconnect = value;
} else if (name == "reconnect-timeout" || name == "reconnect_timeout") {
timeout = timeValue(value);
} else if (name == "reconnect-limit" || name == "reconnect_limit") {
@@ -256,7 +257,7 @@ void ConnectionImpl::open()
void ConnectionImpl::reopen()
{
- if (!reconnect) {
+ if (!autoReconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
open();
@@ -267,7 +268,7 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
- if (!reconnect) {
+ if (!autoReconnect) {
throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
}
if (limit >= 0 && retries++ >= limit) {
@@ -343,6 +344,44 @@ bool ConnectionImpl::backoff()
}
}
+void ConnectionImpl::reconnect(const std::string& u)
+{
+ sys::Mutex::ScopedLock l(lock);
+ try {
+ QPID_LOG(info, "Trying to connect to " << u << "...");
+ Url url(u, settings.protocol.size() ? settings.protocol : TCP);
+ if (url.getUser().size()) settings.username = url.getUser();
+ if (url.getPass().size()) settings.password = url.getPass();
+ connection.open(url, settings);
+ QPID_LOG(info, "Connected to " << u);
+ mergeUrls(connection.getInitialBrokers(), l);
+ if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions");
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+ throw qpid::messaging::TransportFailure(e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(info, "Error while connecting to " << u << ": " << e.what());
+ throw qpid::messaging::MessagingException(e.what());
+ }
+}
+
+void ConnectionImpl::reconnect()
+{
+ if (!tryConnect()) {
+ throw qpid::messaging::TransportFailure("Could not reconnect");
+ }
+}
+std::string ConnectionImpl::getUrl() const
+{
+ if (isOpen()) {
+ std::stringstream u;
+ u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port;
+ return u.str();
+ } else {
+ return std::string();
+ }
+}
+
std::string ConnectionImpl::getAuthenticatedUsername()
{
return connection.getNegotiatedSettings().username;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 00ac30a6df..ae839dc690 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -53,6 +53,9 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
void setOption(const std::string& name, const qpid::types::Variant& value);
bool backoff();
std::string getAuthenticatedUsername();
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
bool getAutoDecode() const;
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -64,7 +67,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
bool replaceUrls; // Replace rather than merging with reconnect-urls
std::vector<std::string> urls;
qpid::client::ConnectionSettings settings;
- bool reconnect;
+ bool autoReconnect;
double timeout;
int32_t limit;
double minReconnectInterval;
diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp
index fde931038b..0993ef5dd9 100644
--- a/qpid/cpp/src/qpid/messaging/Connection.cpp
+++ b/qpid/cpp/src/qpid/messaging/Connection.cpp
@@ -90,4 +90,18 @@ std::string Connection::getAuthenticatedUsername()
{
return impl->getAuthenticatedUsername();
}
+
+void Connection::reconnect(const std::string& url)
+{
+ impl->reconnect(url);
+}
+void Connection::reconnect()
+{
+ impl->reconnect();
+}
+std::string Connection::getUrl() const
+{
+ return impl->getUrl();
+}
+
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
index 1e11d9a6d5..92c6d91b10 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h
@@ -45,6 +45,9 @@ class ConnectionImpl : public virtual qpid::RefCounted
virtual Session getSession(const std::string& name) const = 0;
virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0;
virtual std::string getAuthenticatedUsername() = 0;
+ virtual void reconnect(const std::string& url) = 0;
+ virtual void reconnect() = 0;
+ virtual std::string getUrl() const = 0;
private:
};
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 0d4885c4c3..afa2db6791 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -79,7 +79,7 @@ ConnectionContext::~ConnectionContext()
bool ConnectionContext::isOpen() const
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
@@ -399,7 +399,7 @@ void ConnectionContext::check()
if (state == DISCONNECTED) {
if (ConnectionOptions::reconnect) {
reset();
- reconnect();
+ autoconnect();
} else {
throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
}
@@ -794,7 +794,7 @@ bool expired(const sys::AbsTime& start, double timeout)
const std::string COLON(":");
}
-void ConnectionContext::reconnect()
+void ConnectionContext::autoconnect()
{
qpid::sys::AbsTime started(qpid::sys::now());
QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
@@ -821,29 +821,7 @@ bool ConnectionContext::tryConnect()
try {
QPID_LOG(info, "Trying to connect to " << *i << "...");
if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
- QPID_LOG(info, "Connected to " << *i);
- if (sasl.get()) {
- wakeupDriver();
- while (!sasl->authenticated()) {
- QPID_LOG(debug, id << " Waiting to be authenticated...");
- wait();
- }
- QPID_LOG(debug, id << " Authenticated");
- }
-
- QPID_LOG(debug, id << " Opening...");
- setProperties();
- pn_connection_open(connection);
- wakeupDriver(); //want to write
- while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
- wait();
- }
- if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
- throw qpid::messaging::ConnectionError("Failed to open connection");
- }
- QPID_LOG(debug, id << " Opened");
-
- return restartSessions();
+ return true;
}
} catch (const qpid::messaging::TransportFailure& e) {
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
@@ -852,9 +830,26 @@ bool ConnectionContext::tryConnect()
return false;
}
-bool ConnectionContext::tryConnect(const std::string& url)
+void ConnectionContext::reconnect(const std::string& url)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+ reset();
+ if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) {
+ throw qpid::messaging::TransportFailure("Failed to connect");
+ }
+}
+
+void ConnectionContext::reconnect()
{
- return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+ reset();
+ if (!tryConnect()) {
+ throw qpid::messaging::TransportFailure("Failed to reconnect");
+ }
}
bool ConnectionContext::tryConnect(const Url& url)
@@ -863,11 +858,54 @@ bool ConnectionContext::tryConnect(const Url& url)
if (url.getPass().size()) password = url.getPass();
for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
- if (tryConnect(*i)) return true;
+ if (tryConnect(*i)) {
+ QPID_LOG(info, "Connected to " << *i);
+ setCurrentUrl(*i);
+ if (sasl.get()) {
+ wakeupDriver();
+ while (!sasl->authenticated()) {
+ QPID_LOG(debug, id << " Waiting to be authenticated...");
+ wait();
+ }
+ QPID_LOG(debug, id << " Authenticated");
+ }
+
+ QPID_LOG(debug, id << " Opening...");
+ setProperties();
+ pn_connection_open(connection);
+ wakeupDriver(); //want to write
+ while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+ throw qpid::messaging::ConnectionError("Failed to open connection");
+ }
+ QPID_LOG(debug, id << " Opened");
+
+ return restartSessions();
+ }
}
return false;
}
+void ConnectionContext::setCurrentUrl(const qpid::Address& a)
+{
+ std::stringstream u;
+ u << a;
+ currentUrl = u.str();
+}
+
+std::string ConnectionContext::getUrl() const
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state == CONNECTED) {
+ return currentUrl;
+ } else {
+ return std::string();
+ }
+}
+
+
bool ConnectionContext::tryConnect(const qpid::Address& address)
{
transport = driver->getTransport(address.protocol, *this);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 3b80f71f1d..7387dba13e 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -106,6 +106,9 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
framing::ProtocolVersion getVersion() const;
//additionally, Transport needs:
void opened();//signal successful connection
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
const qpid::sys::SecuritySettings* getTransportSecuritySettings();
private:
@@ -122,6 +125,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
bool readHeader;
bool haveOutput;
std::string id;
+ std::string currentUrl;
enum {
DISCONNECTED,
CONNECTING,
@@ -144,14 +148,14 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
void wakeupDriver();
void attach(pn_link_t*, int credit=0);
- void reconnect();
+ void autoconnect();
bool tryConnect();
- bool tryConnect(const std::string& url);
bool tryConnect(const qpid::Url& url);
bool tryConnect(const qpid::Address& address);
void reset();
bool restartSessions();
void restartSession(boost::shared_ptr<SessionContext>);
+ void setCurrentUrl(const qpid::Address&);
std::size_t decodePlain(const char* buffer, std::size_t size);
std::size_t encodePlain(char* buffer, std::size_t size);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
index 0c4ec2bfcb..c1ab108a61 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp
@@ -81,4 +81,17 @@ std::string ConnectionHandle::getAuthenticatedUsername()
return connection->getAuthenticatedUsername();
}
+void ConnectionHandle::reconnect(const std::string& url)
+{
+ connection->reconnect(url);
+}
+void ConnectionHandle::reconnect()
+{
+ connection->reconnect();
+}
+std::string ConnectionHandle::getUrl() const
+{
+ return connection->getUrl();
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
index d1eb27f6de..0238313f93 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h
@@ -49,6 +49,9 @@ class ConnectionHandle : public qpid::messaging::ConnectionImpl
Session getSession(const std::string& name) const;
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
+ void reconnect(const std::string& url);
+ void reconnect();
+ std::string getUrl() const;
private:
boost::shared_ptr<ConnectionContext> connection;
};