diff options
author | Gordon Sim <gsim@apache.org> | 2013-09-06 19:37:52 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-09-06 19:37:52 +0000 |
commit | 9f89516b7a87b0a60abc665c4d83ae7daecbc8ad (patch) | |
tree | f8fdb59196e37d70fe902244afa28e5a442abd6d | |
parent | e1739e9a6f90a8029a099806e3778205f4f6e8d2 (diff) | |
download | qpid-python-9f89516b7a87b0a60abc665c4d83ae7daecbc8ad.tar.gz |
QPID-4932: expose reconnect&replay logic for application to control itself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1520673 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/examples/messaging/CMakeLists.txt | 2 | ||||
-rw-r--r-- | qpid/cpp/examples/messaging/server_reconnect.cpp | 97 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/messaging/Connection.h | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connection.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 47 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/Connection.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/ConnectionImpl.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 96 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h | 3 |
13 files changed, 281 insertions, 38 deletions
diff --git a/qpid/cpp/examples/messaging/CMakeLists.txt b/qpid/cpp/examples/messaging/CMakeLists.txt index 6b2a06ae3a..5481f2f0bf 100644 --- a/qpid/cpp/examples/messaging/CMakeLists.txt +++ b/qpid/cpp/examples/messaging/CMakeLists.txt @@ -49,6 +49,7 @@ add_messaging_example(map_sender) add_messaging_example(client) add_messaging_example(server) +add_messaging_example(server_reconnect) # These don't need Boost or OptionParser add_executable(hello_world hello_world.cpp) @@ -74,6 +75,7 @@ install (FILES ${CMAKE_CURRENT_SOURCE_DIR}/map_sender.cpp ${CMAKE_CURRENT_SOURCE_DIR}/client.cpp ${CMAKE_CURRENT_SOURCE_DIR}/server.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/server_reconnect.cpp DESTINATION ${QPID_INSTALL_EXAMPLESDIR}/messaging COMPONENT ${QPID_COMPONENT_EXAMPLES}) diff --git a/qpid/cpp/examples/messaging/server_reconnect.cpp b/qpid/cpp/examples/messaging/server_reconnect.cpp new file mode 100644 index 0000000000..ab7147760f --- /dev/null +++ b/qpid/cpp/examples/messaging/server_reconnect.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Address.h> +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/exceptions.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> + +#include <algorithm> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <sstream> + +using namespace qpid::messaging; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; + std::string connectionOptions = argc > 2 ? argv[2] : ""; + + Connection connection(url, connectionOptions); + try { + connection.open(); + Session session = connection.createSession(); + Receiver receiver = session.createReceiver("service_queue; {create: always}"); + + while (true) { + try { + if (!connection.isOpen()) { + // This demonstrates use of application controlled + // reconnect; the reconnect connection option may + // also be used to automatcially handle + // reconnection + if (url.empty()) { + connection.reconnect(); + } else { + connection.reconnect(url); + } + std::cout << "Reconnected to " << connection.getUrl() << std::endl; + } + Message request = receiver.fetch(); + const Address& address = request.getReplyTo(); + if (address) { + Sender sender = session.createSender(address); + std::string s = request.getContent(); + std::transform(s.begin(), s.end(), s.begin(), toupper); + Message response(s); + sender.send(response); + std::cout << "Processed request: " + << request.getContent() + << " -> " + << response.getContent() << std::endl; + session.acknowledge(); + sender.close(); + } else { + std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl; + session.reject(request); + } + } catch (const TransportFailure&) { + std::cout << "Connection to broker was lost, please enter URL to reconnect to (or hit return to use original url):" << std::endl; + if (!std::getline(std::cin, url)) { + return 1; + } + } + } + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + connection.close(); + } + return 1; +} diff --git a/qpid/cpp/include/qpid/messaging/Connection.h b/qpid/cpp/include/qpid/messaging/Connection.h index 58e61c4655..bb83685731 100644 --- a/qpid/cpp/include/qpid/messaging/Connection.h +++ b/qpid/cpp/include/qpid/messaging/Connection.h @@ -98,6 +98,33 @@ class QPID_MESSAGING_CLASS_EXTERN Connection : public qpid::messaging::Handle<Co QPID_MESSAGING_EXTERN void open(); QPID_MESSAGING_EXTERN bool isOpen(); QPID_MESSAGING_EXTERN bool isOpen() const; + + /** + * Attempts to reconnect to the specified url, re-establish + * existing sessions, senders and receivers and resend any indoubt + * messages. + * + * This can be used to directly control reconnect behaviour rather + * than using the reconnect option for automatically handling + * that. + */ + QPID_MESSAGING_EXTERN void reconnect(const std::string& url); + /** + * Attempts to reconnect to the original url, including any + * specified reconnect_urls, re-establish existing sessions, + * senders and receivers and resend any indoubt messages. + * + * This can be used to directly control reconnect behaviour rather + * than using the reconnect option for automatically handling + * that. + */ + QPID_MESSAGING_EXTERN void reconnect(); + /** + * returns a url reprsenting the broker the client is currently + * connected to (or an e,pty string if it is not connected). + */ + QPID_MESSAGING_EXTERN std::string getUrl() const; + /** * Closes a connection and all sessions associated with it. An * opened connection must be closed before the last handle is diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index 8b4eafccaa..26e69233af 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -127,7 +127,7 @@ void Connection::open(const ConnectionSettings& settings) impl->registerFailureCallback ( failureCallback ); } -const ConnectionSettings& Connection::getNegotiatedSettings() +const ConnectionSettings& Connection::getNegotiatedSettings() const { if (!isOpen()) throw Exception(QPID_MSG("Connection is not open.")); diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index c0db0f301d..fb502cb40a 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -216,7 +216,7 @@ class QPID_CLIENT_CLASS_EXTERN Connection /** * Return the set of client negotiated settings */ - QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings(); + QPID_CLIENT_EXTERN const ConnectionSettings& getNegotiatedSettings() const; friend struct ConnectionAccess; ///<@internal friend class SessionBase_0_10; ///<@internal diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 366218c002..9406c992fe 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -43,6 +43,7 @@ using qpid::framing::Uuid; namespace { const std::string TCP("tcp"); +const std::string COLON(":"); double FOREVER(std::numeric_limits<double>::max()); // Time values in seconds can be specified as integer or floating point values. @@ -86,7 +87,7 @@ bool expired(const sys::AbsTime& start, double timeout) } // namespace ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : - replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), + replaceUrls(false), autoReconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2), retries(0), reconnectOnLimitExceeded(true), disableAutoDecode(false) { @@ -106,7 +107,7 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) { sys::Mutex::ScopedLock l(lock); if (name == "reconnect") { - reconnect = value; + autoReconnect = value; } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { timeout = timeValue(value); } else if (name == "reconnect-limit" || name == "reconnect_limit") { @@ -256,7 +257,7 @@ void ConnectionImpl::open() void ConnectionImpl::reopen() { - if (!reconnect) { + if (!autoReconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); } open(); @@ -267,7 +268,7 @@ void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { - if (!reconnect) { + if (!autoReconnect) { throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); } if (limit >= 0 && retries++ >= limit) { @@ -343,6 +344,44 @@ bool ConnectionImpl::backoff() } } +void ConnectionImpl::reconnect(const std::string& u) +{ + sys::Mutex::ScopedLock l(lock); + try { + QPID_LOG(info, "Trying to connect to " << u << "..."); + Url url(u, settings.protocol.size() ? settings.protocol : TCP); + if (url.getUser().size()) settings.username = url.getUser(); + if (url.getPass().size()) settings.password = url.getPass(); + connection.open(url, settings); + QPID_LOG(info, "Connected to " << u); + mergeUrls(connection.getInitialBrokers(), l); + if (!resetSessions(l)) throw qpid::messaging::TransportFailure("Could not re-establish sessions"); + } catch (const qpid::TransportFailure& e) { + QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); + throw qpid::messaging::TransportFailure(e.what()); + } catch (const std::exception& e) { + QPID_LOG(info, "Error while connecting to " << u << ": " << e.what()); + throw qpid::messaging::MessagingException(e.what()); + } +} + +void ConnectionImpl::reconnect() +{ + if (!tryConnect()) { + throw qpid::messaging::TransportFailure("Could not reconnect"); + } +} +std::string ConnectionImpl::getUrl() const +{ + if (isOpen()) { + std::stringstream u; + u << connection.getNegotiatedSettings().protocol << COLON << connection.getNegotiatedSettings().host << COLON << connection.getNegotiatedSettings().port; + return u.str(); + } else { + return std::string(); + } +} + std::string ConnectionImpl::getAuthenticatedUsername() { return connection.getNegotiatedSettings().username; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 00ac30a6df..ae839dc690 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -53,6 +53,9 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl void setOption(const std::string& name, const qpid::types::Variant& value); bool backoff(); std::string getAuthenticatedUsername(); + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; bool getAutoDecode() const; private: typedef std::map<std::string, qpid::messaging::Session> Sessions; @@ -64,7 +67,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl bool replaceUrls; // Replace rather than merging with reconnect-urls std::vector<std::string> urls; qpid::client::ConnectionSettings settings; - bool reconnect; + bool autoReconnect; double timeout; int32_t limit; double minReconnectInterval; diff --git a/qpid/cpp/src/qpid/messaging/Connection.cpp b/qpid/cpp/src/qpid/messaging/Connection.cpp index fde931038b..0993ef5dd9 100644 --- a/qpid/cpp/src/qpid/messaging/Connection.cpp +++ b/qpid/cpp/src/qpid/messaging/Connection.cpp @@ -90,4 +90,18 @@ std::string Connection::getAuthenticatedUsername() { return impl->getAuthenticatedUsername(); } + +void Connection::reconnect(const std::string& url) +{ + impl->reconnect(url); +} +void Connection::reconnect() +{ + impl->reconnect(); +} +std::string Connection::getUrl() const +{ + return impl->getUrl(); +} + }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h index 1e11d9a6d5..92c6d91b10 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/messaging/ConnectionImpl.h @@ -45,6 +45,9 @@ class ConnectionImpl : public virtual qpid::RefCounted virtual Session getSession(const std::string& name) const = 0; virtual void setOption(const std::string& name, const qpid::types::Variant& value) = 0; virtual std::string getAuthenticatedUsername() = 0; + virtual void reconnect(const std::string& url) = 0; + virtual void reconnect() = 0; + virtual std::string getUrl() const = 0; private: }; }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 0d4885c4c3..afa2db6791 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -79,7 +79,7 @@ ConnectionContext::~ConnectionContext() bool ConnectionContext::isOpen() const { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); + return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) @@ -399,7 +399,7 @@ void ConnectionContext::check() if (state == DISCONNECTED) { if (ConnectionOptions::reconnect) { reset(); - reconnect(); + autoconnect(); } else { throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); } @@ -794,7 +794,7 @@ bool expired(const sys::AbsTime& start, double timeout) const std::string COLON(":"); } -void ConnectionContext::reconnect() +void ConnectionContext::autoconnect() { qpid::sys::AbsTime started(qpid::sys::now()); QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); @@ -821,29 +821,7 @@ bool ConnectionContext::tryConnect() try { QPID_LOG(info, "Trying to connect to " << *i << "..."); if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) { - QPID_LOG(info, "Connected to " << *i); - if (sasl.get()) { - wakeupDriver(); - while (!sasl->authenticated()) { - QPID_LOG(debug, id << " Waiting to be authenticated..."); - wait(); - } - QPID_LOG(debug, id << " Authenticated"); - } - - QPID_LOG(debug, id << " Opening..."); - setProperties(); - pn_connection_open(connection); - wakeupDriver(); //want to write - while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { - wait(); - } - if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { - throw qpid::messaging::ConnectionError("Failed to open connection"); - } - QPID_LOG(debug, id << " Opened"); - - return restartSessions(); + return true; } } catch (const qpid::messaging::TransportFailure& e) { QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); @@ -852,9 +830,26 @@ bool ConnectionContext::tryConnect() return false; } -bool ConnectionContext::tryConnect(const std::string& url) +void ConnectionContext::reconnect(const std::string& url) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + reset(); + if (!tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol))) { + throw qpid::messaging::TransportFailure("Failed to connect"); + } +} + +void ConnectionContext::reconnect() { - return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol)); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + reset(); + if (!tryConnect()) { + throw qpid::messaging::TransportFailure("Failed to reconnect"); + } } bool ConnectionContext::tryConnect(const Url& url) @@ -863,11 +858,54 @@ bool ConnectionContext::tryConnect(const Url& url) if (url.getPass().size()) password = url.getPass(); for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { - if (tryConnect(*i)) return true; + if (tryConnect(*i)) { + QPID_LOG(info, "Connected to " << *i); + setCurrentUrl(*i); + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated()) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + wait(); + } + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + setProperties(); + pn_connection_open(connection); + wakeupDriver(); //want to write + while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { + wait(); + } + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + QPID_LOG(debug, id << " Opened"); + + return restartSessions(); + } } return false; } +void ConnectionContext::setCurrentUrl(const qpid::Address& a) +{ + std::stringstream u; + u << a; + currentUrl = u.str(); +} + +std::string ConnectionContext::getUrl() const +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state == CONNECTED) { + return currentUrl; + } else { + return std::string(); + } +} + + bool ConnectionContext::tryConnect(const qpid::Address& address) { transport = driver->getTransport(address.protocol, *this); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 3b80f71f1d..7387dba13e 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -106,6 +106,9 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag framing::ProtocolVersion getVersion() const; //additionally, Transport needs: void opened();//signal successful connection + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; const qpid::sys::SecuritySettings* getTransportSecuritySettings(); private: @@ -122,6 +125,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag bool readHeader; bool haveOutput; std::string id; + std::string currentUrl; enum { DISCONNECTED, CONNECTING, @@ -144,14 +148,14 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); void wakeupDriver(); void attach(pn_link_t*, int credit=0); - void reconnect(); + void autoconnect(); bool tryConnect(); - bool tryConnect(const std::string& url); bool tryConnect(const qpid::Url& url); bool tryConnect(const qpid::Address& address); void reset(); bool restartSessions(); void restartSession(boost::shared_ptr<SessionContext>); + void setCurrentUrl(const qpid::Address&); std::size_t decodePlain(const char* buffer, std::size_t size); std::size_t encodePlain(char* buffer, std::size_t size); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp index 0c4ec2bfcb..c1ab108a61 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.cpp @@ -81,4 +81,17 @@ std::string ConnectionHandle::getAuthenticatedUsername() return connection->getAuthenticatedUsername(); } +void ConnectionHandle::reconnect(const std::string& url) +{ + connection->reconnect(url); +} +void ConnectionHandle::reconnect() +{ + connection->reconnect(); +} +std::string ConnectionHandle::getUrl() const +{ + return connection->getUrl(); +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h index d1eb27f6de..0238313f93 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionHandle.h @@ -49,6 +49,9 @@ class ConnectionHandle : public qpid::messaging::ConnectionImpl Session getSession(const std::string& name) const; void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); + void reconnect(const std::string& url); + void reconnect(); + std::string getUrl() const; private: boost::shared_ptr<ConnectionContext> connection; }; |