summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-11-07 14:08:29 +0000
committerGordon Sim <gsim@apache.org>2008-11-07 14:08:29 +0000
commit53247faba484516d9c468b7ae7f514f32541295a (patch)
tree2a823dcf2f292479ba122a25a1fadec3837f9562
parent0e0d76bedfb1f369ab0a71a62c931c13d66e0b7e (diff)
downloadqpid-python-53247faba484516d9c468b7ae7f514f32541295a.tar.gz
* Added some doxygen comments for FailoverManager
* Added means for application to alter the order in which urls are tried (or indeed the list of urls to try) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@712127 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/failover/declare_queues.cpp73
-rw-r--r--qpid/cpp/examples/failover/resuming_receiver.cpp16
-rw-r--r--qpid/cpp/src/qpid/client/FailoverManager.cpp10
-rw-r--r--qpid/cpp/src/qpid/client/FailoverManager.h59
4 files changed, 101 insertions, 57 deletions
diff --git a/qpid/cpp/examples/failover/declare_queues.cpp b/qpid/cpp/examples/failover/declare_queues.cpp
index 14e4a1e3cb..a677870c53 100644
--- a/qpid/cpp/examples/failover/declare_queues.cpp
+++ b/qpid/cpp/examples/failover/declare_queues.cpp
@@ -19,67 +19,40 @@
*
*/
-#include <qpid/client/FailoverConnection.h>
+#include <qpid/client/FailoverManager.h>
#include <qpid/client/Session.h>
+#include <qpid/Exception.h>
-#include <unistd.h>
#include <cstdlib>
#include <iostream>
-#include <fstream>
using namespace qpid::client;
-using namespace qpid::framing;
-
using namespace std;
-
-
-
-int
-main ( int argc, char ** argv)
+int main(int argc, char ** argv)
{
- if ( argc < 3 )
- {
- std::cerr << "Usage: ./declare_queues host cluster_port_file_name\n";
- std::cerr << "i.e. for host: 127.0.0.1\n";
- exit(1);
- }
-
- const char * host = argv[1];
- int port = atoi(argv[2]);
-
-
- try
- {
- FailoverConnection connection;
- FailoverSession * session;
-
- connection.open ( host, port );
- session = connection.newSession();
-
- session->queueDeclare ( "message_queue");
+ ConnectionSettings settings;
+ if (argc > 1) settings.host = argv[1];
+ if (argc > 2) settings.port = atoi(argv[2]);
+
+ FailoverManager connection(settings);
+ try {
+ bool complete = false;
+ while (!complete) {
+ Session session = connection.connect().newSession();
+ try {
+ session.queueDeclare(arg::queue="message_queue");
+ complete = true;
+ } catch (const qpid::TransportFailure&) {}
+ }
+ connection.close();
+ return 0;
+ } catch (const std::exception& error) {
+ std::cout << "Failed:" << error.what() << std::endl;
+ return 1;
+ }
- /*
- session->exchangeBind
- ( arg::exchange="amq.direct",
- arg::queue="message_queue",
- arg::bindingKey="routing_key"
- );
- * */
- session->exchangeBind ( "message_queue",
- "amq.direct",
- "routing_key"
- );
- connection.close();
- return 0;
- }
- catch ( const std::exception& error )
- {
- std::cout << error.what() << std::endl;
- }
-
- return 1;
}
diff --git a/qpid/cpp/examples/failover/resuming_receiver.cpp b/qpid/cpp/examples/failover/resuming_receiver.cpp
index 3c1df92ed1..d1886ce861 100644
--- a/qpid/cpp/examples/failover/resuming_receiver.cpp
+++ b/qpid/cpp/examples/failover/resuming_receiver.cpp
@@ -35,13 +35,16 @@ using namespace qpid::framing;
using namespace std;
-class Listener : public MessageListener, public FailoverManager::Command
+class Listener : public MessageListener,
+ public FailoverManager::Command,
+ public FailoverManager::ReconnectionStrategy
{
public:
Listener();
void received(Message& message);
void execute(AsyncSession& session, bool isRetry);
void check();
+ void editUrlList(std::vector<Url>& urls);
private:
Subscription subscription;
uint count;
@@ -90,14 +93,23 @@ void Listener::execute(AsyncSession& session, bool isRetry)
subs.run();
}
+void Listener::editUrlList(std::vector<Url>& urls)
+{
+ /**
+ * A more realistic algorithm would be to search through the list
+ * for prefered hosts and ensure they come first in the list.
+ */
+ if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end());
+}
+
int main(int argc, char ** argv)
{
ConnectionSettings settings;
if (argc > 1) settings.host = argv[1];
if (argc > 2) settings.port = atoi(argv[2]);
- FailoverManager connection(settings);
Listener listener;
+ FailoverManager connection(settings, &listener);
try {
connection.execute(listener);
diff --git a/qpid/cpp/src/qpid/client/FailoverManager.cpp b/qpid/cpp/src/qpid/client/FailoverManager.cpp
index 73c6bfc2de..ab9dbca70f 100644
--- a/qpid/cpp/src/qpid/client/FailoverManager.cpp
+++ b/qpid/cpp/src/qpid/client/FailoverManager.cpp
@@ -28,7 +28,8 @@ namespace client {
using qpid::sys::Monitor;
-FailoverManager::FailoverManager(const ConnectionSettings& s) : settings(s), state(IDLE) {}
+FailoverManager::FailoverManager(const ConnectionSettings& s,
+ ReconnectionStrategy* rs) : settings(s), strategy(rs), state(IDLE) {}
void FailoverManager::execute(Command& c)
{
@@ -38,11 +39,11 @@ void FailoverManager::execute(Command& c)
try {
AsyncSession session = connect().newSession();
c.execute(session, retry);
- session.sync();//TODO: shouldn't be required, but seems there is a bug in session
+ session.sync();//TODO: shouldn't be required
session.close();
completed = true;
} catch(const TransportFailure&) {
- retry = true;
+ retry = true;
}
}
}
@@ -86,6 +87,7 @@ Connection& FailoverManager::getConnection()
void FailoverManager::attempt(Connection& c, ConnectionSettings s, std::vector<Url> urls)
{
Monitor::ScopedUnlock u(lock);
+ if (strategy) strategy->editUrlList(urls);
if (urls.empty()) {
attempt(c, s);
} else {
@@ -105,7 +107,9 @@ void FailoverManager::attempt(Connection& c, ConnectionSettings s, std::vector<U
void FailoverManager::attempt(Connection& c, ConnectionSettings s)
{
try {
+ QPID_LOG(info, "Attempting to connect to " << s.host << " on " << s.port << "...");
c.open(s);
+ QPID_LOG(info, "Connected to " << s.host << " on " << s.port);
} catch (const Exception& e) {
QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port << ": " << e.what());
}
diff --git a/qpid/cpp/src/qpid/client/FailoverManager.h b/qpid/cpp/src/qpid/client/FailoverManager.h
index d17b8371d0..5af2a8f994 100644
--- a/qpid/cpp/src/qpid/client/FailoverManager.h
+++ b/qpid/cpp/src/qpid/client/FailoverManager.h
@@ -38,21 +38,75 @@ struct CannotConnectException : qpid::Exception
};
/**
- * Utility to handle reconnection.
+ * Utility to manage failover.
*/
class FailoverManager
{
public:
+ /**
+ * Interface to implement for doing work that can be resumed on
+ * failover
+ */
struct Command
{
+ /**
+ * This method will be called with isRetry=false when the
+ * command is first executed. The session to use for the work
+ * will be passed to the implementing class. If the connection
+ * fails while the execute call is in progress, the
+ * FailoverManager controlling the execution will re-establish
+ * a connection, open a new session and call back to the
+ * Command implementations execute method with the new session
+ * and isRetry=true.
+ */
virtual void execute(AsyncSession& session, bool isRetry) = 0;
virtual ~Command() {}
};
- FailoverManager(const ConnectionSettings& settings);
+ struct ReconnectionStrategy
+ {
+ /**
+ * This method is called by the FailoverManager prior to
+ * establishing a connection (or re-connection) and can be
+ * used if the application wishes to edit or re-order the list
+ * which will default to the list of known brokers for the
+ * last connection.
+ */
+ virtual void editUrlList(std::vector<Url>& urls) = 0;
+ virtual ~ReconnectionStrategy() {}
+ };
+
+ /**
+ * Create a manager to control failover for a logical connection.
+ *
+ * @param settings the initial connection settings
+ * @param strategy optional stratgey callback allowing application
+ * to edit or reorder the list of urls to which reconnection is
+ * attempted
+ */
+ FailoverManager(const ConnectionSettings& settings, ReconnectionStrategy* strategy = 0);
+ /**
+ * Return the current connection if open or attept to reconnect to
+ * the specified list of urls. If no list is specified the list of
+ * known brokers from the last connection will be used. If no list
+ * is specified and this is the first connect attempt, the host
+ * and port from the initial settings will be used.
+ */
Connection& connect(std::vector<Url> brokers = std::vector<Url>());
+ /**
+ * Return the current connection whether open or not
+ */
Connection& getConnection();
+ /**
+ * Close the current connection
+ */
void close();
+ /**
+ * Reliably execute the specified command. This involves creating
+ * a session on which to carry out the work of the command,
+ * handling failover occuring while exeuting that command and
+ * re-starting the work.
+ */
void execute(Command&);
private:
enum State {IDLE, CONNECTING, CANT_CONNECT};
@@ -60,6 +114,7 @@ class FailoverManager
qpid::sys::Monitor lock;
Connection connection;
ConnectionSettings settings;
+ ReconnectionStrategy* strategy;
State state;
void attempt(Connection&, ConnectionSettings settings, std::vector<Url> urls);