diff options
author | Alan Conway <aconway@apache.org> | 2008-10-10 04:49:48 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-10 04:49:48 +0000 |
commit | 5d07d177cfc5eca21c44981bbe342f0cdcced4e5 (patch) | |
tree | 0f5f83642ed5effed52a5e2547565362ce2aea8c | |
parent | e7ceead683231ef2cb35a6ee70488e859f023d12 (diff) | |
download | qpid-python-5d07d177cfc5eca21c44981bbe342f0cdcced4e5.tar.gz |
QPID-1340 froM Mick Goulish: preliminary client-side failover support.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703319 13f79535-47bb-0310-9956-ffa450edef68
27 files changed, 2302 insertions, 19 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac index 843a032f9c..02080d4c6f 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -331,6 +331,7 @@ AC_CONFIG_FILES([ examples/fanout/Makefile examples/pub-sub/Makefile examples/request-response/Makefile + examples/failover/Makefile examples/xml-exchange/Makefile managementgen/Makefile etc/Makefile diff --git a/cpp/examples/Makefile.am b/cpp/examples/Makefile.am index 5415cae1a9..57ad22ec78 100644 --- a/cpp/examples/Makefile.am +++ b/cpp/examples/Makefile.am @@ -1,4 +1,4 @@ -SUBDIRS = direct fanout pub-sub request-response +SUBDIRS = direct fanout pub-sub request-response failover if HAVE_XML SUBDIRS += xml-exchange endif diff --git a/cpp/examples/failover/Makefile.am b/cpp/examples/failover/Makefile.am new file mode 100644 index 0000000000..8fe6b8cba7 --- /dev/null +++ b/cpp/examples/failover/Makefile.am @@ -0,0 +1,21 @@ +examplesdir=$(pkgdatadir)/examples/direct + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=direct_producer listener declare_queues +direct_producer_SOURCES=direct_producer.cpp +direct_producer_LDADD=$(CLIENT_LIB) + +listener_SOURCES=listener.cpp +listener_LDADD=$(CLIENT_LIB) + +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + direct_producer.cpp \ + listener.cpp \ + declare_queues.cpp \ + $(MAKEDIST) + +# FIXME aconway 2008-10-10: add verify scripts. diff --git a/cpp/examples/failover/declare_queues.cpp b/cpp/examples/failover/declare_queues.cpp new file mode 100644 index 0000000000..14e4a1e3cb --- /dev/null +++ b/cpp/examples/failover/declare_queues.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverConnection.h> +#include <qpid/client/Session.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <fstream> + +using namespace qpid::client; +using namespace qpid::framing; + + +using namespace std; + + + + +int +main ( int argc, char ** argv) +{ + if ( argc < 3 ) + { + std::cerr << "Usage: ./declare_queues host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + const char * host = argv[1]; + int port = atoi(argv[2]); + + + try + { + FailoverConnection connection; + FailoverSession * session; + + connection.open ( host, port ); + session = connection.newSession(); + + session->queueDeclare ( "message_queue"); + + /* + session->exchangeBind + ( arg::exchange="amq.direct", + arg::queue="message_queue", + arg::bindingKey="routing_key" + ); + * */ + session->exchangeBind ( "message_queue", + "amq.direct", + "routing_key" + ); + connection.close(); + return 0; + } + catch ( const std::exception& error ) + { + std::cout << error.what() << std::endl; + } + + return 1; +} + + + + + diff --git a/cpp/examples/failover/direct_producer.cpp b/cpp/examples/failover/direct_producer.cpp new file mode 100644 index 0000000000..9a510b6fb3 --- /dev/null +++ b/cpp/examples/failover/direct_producer.cpp @@ -0,0 +1,148 @@ +#include <qpid/client/FailoverConnection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <fstream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + + + +int +main ( int argc, char ** argv) +{ + struct timeval broker_killed_time = {0,0}, + failover_complete_time = {0,0}, + duration = {0,0}; + + + if ( argc < 3 ) + { + std::cerr << "Usage: ./direct_producer host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + char const * host = argv[1]; + int port = atoi(argv[2]); + char const * broker_to_kill = 0; + + if ( argc > 3 ) + { + broker_to_kill = argv[3]; + std::cerr << "main: Broker marked for death is process ID " + << broker_to_kill + << endl; + } + else + { + std::cerr << "PRODUCER main: there is no broker to kill.\n"; + } + + FailoverConnection connection; + FailoverSession * session; + Message message; + + string program_name = "PRODUCER"; + + + connection.failoverCompleteTime = & failover_complete_time; + connection.name = program_name; + connection.open ( host, port ); + + session = connection.newSession(); + session->name = program_name; + + int send_this_many = 30, + messages_sent = 0; + + while ( messages_sent < send_this_many ) + { + if ( (messages_sent == 13) && broker_to_kill ) + { + char command[1000]; + std::cerr << program_name << " killing broker " << broker_to_kill << ".\n"; + sprintf(command, "kill -9 %s", broker_to_kill); + system ( command ); + gettimeofday ( & broker_killed_time, 0 ); + } + + message.getDeliveryProperties().setRoutingKey("routing_key"); + + std::cerr << "sending message " + << messages_sent + << " of " + << send_this_many + << ".\n"; + + stringstream message_data; + message_data << messages_sent; + message.setData(message_data.str()); + + try + { + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + } + catch ( const std::exception& error) + { + cerr << program_name << " exception: " << error.what() << endl; + } + + sleep ( 1 ); + ++ messages_sent; + } + + message.setData ( "That's all, folks!" ); + + /* MICK FIXME + session.messageTransfer ( arg::content=message, + arg::destination="amq.direct" + ); + */ + session->messageTransfer ( "amq.direct", + 1, + 0, + message + ); + + session->sync(); + connection.close(); + + // This will be incorrect if you killed more than one... + if ( broker_to_kill ) + { + timersub ( & failover_complete_time, + & broker_killed_time, + & duration + ); + fprintf ( stderr, + "Failover time: %ld.%.6ld\n", + duration.tv_sec, + duration.tv_usec + ); + } + + return 0; +} + + + diff --git a/cpp/examples/failover/listener.cpp b/cpp/examples/failover/listener.cpp new file mode 100644 index 0000000000..c58a3b5e71 --- /dev/null +++ b/cpp/examples/failover/listener.cpp @@ -0,0 +1,249 @@ + +#include <qpid/client/FailoverConnection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <fstream> + + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + + + +struct Recorder +{ + unsigned int max_messages; + unsigned int * messages_received; + + Recorder ( ) + { + max_messages = 1000; + messages_received = new unsigned int [ max_messages ]; + memset ( messages_received, 0, max_messages * sizeof(int) ); + } + + + void + received ( int i ) + { + messages_received[i] ++; + } + + + + void + report ( ) + { + int i; + + int last_received_message = 0; + + vector<unsigned int> missed_messages, + multiple_messages; + + /*---------------------------------------------------- + Collect indices of missed and multiple messages. + ----------------------------------------------------*/ + bool seen_first_message = false; + for ( i = max_messages - 1; i >= 0; -- i ) + { + if ( ! seen_first_message ) + { + if ( messages_received [i] > 0 ) + { + seen_first_message = true; + last_received_message = i; + } + } + else + { + if ( messages_received [i] == 0 ) + missed_messages.push_back ( i ); + else + if ( messages_received [i] > 1 ) + { + multiple_messages.push_back ( i ); + } + } + } + + /*-------------------------------------------- + Report missed messages. + --------------------------------------------*/ + char const * verb = ( missed_messages.size() == 1 ) + ? " was " + : " were "; + + char const * plural = ( missed_messages.size() == 1 ) + ? "." + : "s."; + + std::cerr << "Listener::shutdown: There" + << verb + << missed_messages.size() + << " missed message" + << plural + << endl; + + for ( i = 0; i < int(missed_messages.size()); ++ i ) + { + std::cerr << " " << i << " was missed.\n"; + } + + + /*-------------------------------------------- + Report multiple messages. + --------------------------------------------*/ + verb = ( multiple_messages.size() == 1 ) + ? " was " + : " were "; + + plural = ( multiple_messages.size() == 1 ) + ? "." + : "s."; + + std::cerr << "Listener::shutdown: There" + << verb + << multiple_messages.size() + << " multiple message" + << plural + << endl; + + for ( i = 0; i < int(multiple_messages.size()); ++ i ) + { + std::cerr << " " + << multiple_messages[i] + << " was received " + << messages_received [ multiple_messages[i] ] + << " times.\n"; + } + + /* + for ( i = 0; i < last_received_message; ++ i ) + { + std::cerr << "Message " << i << ": " << messages_received[i] << std::endl; + } + */ + } + +}; + + + + +struct Listener : public MessageListener +{ + FailoverSubscriptionManager & subscriptionManager; + Recorder & recorder; + + + Listener ( FailoverSubscriptionManager& subs, + Recorder & recorder + ); + + void shutdown() { recorder.report(); } + void parse_message ( std::string const & msg ); + + virtual void received ( Message & message ); +}; + + + + + +Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) : + subscriptionManager(s), + recorder(r) +{ +} + + + + + +void +Listener::received ( Message & message ) +{ + std::cerr << "Listener received: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") + { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptionManager.cancel(message.getDestination()); + + shutdown(); + } + else + { + parse_message ( message.getData() ); + } +} + + + + + +void +Listener::parse_message ( const std::string & msg ) +{ + int msg_number; + if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) ) + { + std::cerr << "Listener::parse_message error: Can't read message number from this message: |" << msg_number << "|\n"; + return; + } + recorder.received ( msg_number ); +} + + + + + + +int +main ( int argc, char ** argv ) +{ + string program_name = "LISTENER"; + + if ( argc < 3 ) + { + std::cerr << "Usage: ./listener host cluster_port_file_name\n"; + std::cerr << "i.e. for host: 127.0.0.1\n"; + exit(1); + } + + char const * host = argv[1]; + int port = atoi(argv[2]); + + FailoverConnection connection; + FailoverSession * session; + Recorder recorder; + + connection.name = program_name; + + connection.open ( host, port ); + session = connection.newSession(); + session->name = program_name; + + FailoverSubscriptionManager subscriptions ( session ); + subscriptions.name = program_name; + Listener listener ( subscriptions, recorder ); + subscriptions.subscribe ( listener, "message_queue" ); + subscriptions.run ( ); + + connection.close(); + + return 1; +} + + + + diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 86660b89ac..528236dd62 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -371,11 +371,14 @@ libqpidclient_la_SOURCES = \ qpid/client/Bounds.cpp \ qpid/client/Connection.cpp \ qpid/client/ConnectionHandler.cpp \ - qpid/client/ConnectionImpl.cpp \ + qpid/client/ConnectionImpl.cpp \ qpid/client/ConnectionSettings.cpp \ - qpid/client/Connector.cpp \ + qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ qpid/client/Dispatcher.cpp \ + qpid/client/FailoverConnection.cpp \ + qpid/client/FailoverSession.cpp \ + qpid/client/FailoverSubscriptionManager.cpp \ qpid/client/FailoverListener.h \ qpid/client/FailoverListener.cpp \ qpid/client/Future.cpp \ @@ -508,6 +511,9 @@ nobase_include_HEADERS = \ qpid/client/Demux.h \ qpid/client/Dispatcher.h \ qpid/client/Execution.h \ + qpid/client/FailoverConnection.h \ + qpid/client/FailoverSession.h \ + qpid/client/FailoverSubscriptionManager.h \ qpid/client/FlowControl.h \ qpid/client/Future.h \ qpid/client/FutureCompletion.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 8fffebf04b..9a3925b053 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -140,7 +140,8 @@ Broker::Broker(const Broker::Options& conf) : qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), - *this) + *this), + getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); @@ -426,5 +427,15 @@ uint32_t Broker::queueMoveMessages( boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; } +std::vector<Url> +Broker::getKnownBrokersImpl() +{ + knownBrokers.clear(); + knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( getPort() ) ); + return knownBrokers; +} + + + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 905b16d54f..fec32d620d 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -113,6 +113,9 @@ class Broker : public sys::Runnable, public Plugin::Target, void declareStandardExchange(const std::string& name, const std::string& type); + std::vector<Url> knownBrokers; + std::vector<Url> getKnownBrokersImpl(); + public: @@ -191,6 +194,9 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; } void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } + + boost::function<std::vector<Url> ()> getKnownBrokers; + }; }} diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index fae4992270..c47037cf9c 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -28,6 +28,7 @@ #include "qpid/framing/ServerInvoker.h" #include "qpid/framing/enum.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include "AclModule.h" using namespace qpid; @@ -127,8 +128,11 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, void ConnectionHandler::Handler::open(const string& /*virtualHost*/, const framing::Array& /*capabilities*/, bool /*insist*/) { - framing::Array knownhosts; - client.openOk(knownhosts); + std::vector<Url> urls = connection.broker.getKnownBrokers(); + framing::Array array(0x95); // str16 array + for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) + array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); + client.openOk(array); } diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 27706fab8c..3ee70c222a 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -99,6 +99,15 @@ bool Connection::isOpen() const { return impl && impl->isOpen(); } +void +Connection::registerFailureCallback ( boost::function<void ()> fn ) { + failureCallback = fn; + if ( impl ) + impl->registerFailureCallback ( fn ); +} + + + void Connection::open(const ConnectionSettings& settings) { if (isOpen()) @@ -106,6 +115,8 @@ void Connection::open(const ConnectionSettings& settings) impl = shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); impl->open(); + if ( failureCallback ) + impl->registerFailureCallback ( failureCallback ); } Session Connection::newSession(const std::string& name, uint32_t timeout) { diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index a5ea40ff38..9595e683e1 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -45,9 +45,13 @@ class Connection { framing::ProtocolVersion version; + boost::function<void ()> failureCallback; + + protected: boost::shared_ptr<ConnectionImpl> impl; + public: /** * Creates a connection object, but does not open the connection. @@ -168,6 +172,7 @@ class Connection bool isOpen() const; std::vector<Url> getKnownBrokers(); + void registerFailureCallback ( boost::function<void ()> fn ); friend class ConnectionAccess; ///<@internal friend class SessionBase_0_10; ///<@internal diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 7f1cd5ce7f..b2245077e2 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -157,13 +157,17 @@ void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSize proxy.open(virtualhost, capabilities, insist); } -void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/) +void ConnectionHandler::openOk ( const framing::Array& knownBrokers ) { checkState(OPENING, INVALID_STATE_OPEN_OK); - //TODO: store knownHosts for reconnection etc + knownBrokersUrls.clear(); + framing::Array::ValueVector::const_iterator i; + for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i ) + knownBrokersUrls.push_back(Url((*i)->get<std::string>())); setState(OPEN); } + void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/) { throw NotImplementedException("Redirection received from broker; not yet implemented in client"); diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 40be3a5237..28ca875ace 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -32,6 +32,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/InputHandler.h" +#include "qpid/Url.h" namespace qpid { namespace client { @@ -103,6 +104,8 @@ public: CloseListener onClose; ErrorListener onError; + + std::vector<Url> knownBrokersUrls; }; }} diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 910c908ee2..93eea50e43 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -151,13 +151,20 @@ static const std::string CONN_CLOSED("Connection closed by broker"); void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); - if (handler.isClosed()) return; + if (handler.isClosed()) + { + std::cerr << "MDEBUG ConnectionImpl::shutdown -- returning w/o failure callback!\n"; + return; + } // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have // an appropriate close-code. connection-forced is not right. if (!handler.isClosing()) closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); handler.fail(CONN_CLOSED); + + if ( failureCallback ) + failureCallback(); } void ConnectionImpl::erase(uint16_t ch) { @@ -171,8 +178,8 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() } std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { - // FIXME aconway 2008-10-08: initialize failover list from openOk or settings - return failover ? failover->getKnownBrokers() : std::vector<qpid::Url>(); + // FIXME aconway 2008-10-08: ensure we never return empty list, always include self Url. + return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& name, uint32_t timeout, uint16_t channel) { diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 22450a7ddf..a432afff4f 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -69,6 +69,8 @@ class ConnectionImpl : public Bounds, void idleIn(); void shutdown(); + boost::function<void ()> failureCallback; + public: ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings); ~ConnectionImpl(); @@ -82,12 +84,11 @@ class ConnectionImpl : public Bounds, void close(); void handle(framing::AMQFrame& frame); void erase(uint16_t channel); - void stopFailoverListener(); - const ConnectionSettings& getNegotiatedSettings(); std::vector<Url> getKnownBrokers(); - + void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; } + void stopFailoverListener(); }; }} diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 5028d68405..08905bc96c 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -49,7 +49,10 @@ void Subscriber::received(Message& msg) } Dispatcher::Dispatcher(const Session& s, const std::string& q) - : session(s), running(false), autoStop(true) + : session(s), + running(false), + autoStop(true), + failoverHandler(0) { queue = q.empty() ? session.getExecution().getDemux().getDefault() : @@ -91,9 +94,20 @@ void Dispatcher::run() } session.sync(); // Make sure all our acks are received before returning. } - catch (const ClosedException&) {} //ignore it and return + catch (const ClosedException& e) + { + QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what()); + } //ignore it and return catch (const std::exception& e) { QPID_LOG(error, "Exception in client dispatch thread: " << e.what()); + if ( failoverHandler ) + { + failoverHandler(); + } + else + { + QPID_LOG(info, "No dispatcher failover handler registered."); + } } } diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h index 7d42bf8793..d85785ed2c 100644 --- a/cpp/src/qpid/client/Dispatcher.h +++ b/cpp/src/qpid/client/Dispatcher.h @@ -69,6 +69,8 @@ class Dispatcher : public sys::Runnable Subscriber::shared_ptr find(const std::string& name); bool isStopped(); + boost::function<void ()> failoverHandler; + public: Dispatcher(const Session& session, const std::string& queue = ""); @@ -77,6 +79,11 @@ public: void stop(); void setAutoStop(bool b); + void registerFailoverHandler ( boost::function<void ()> fh ) + { + failoverHandler = fh; + } + void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy()); void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy()); void cancel(const std::string& destination); diff --git a/cpp/src/qpid/client/FailoverConnection.cpp b/cpp/src/qpid/client/FailoverConnection.cpp new file mode 100644 index 0000000000..cac680295d --- /dev/null +++ b/cpp/src/qpid/client/FailoverConnection.cpp @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + + +#include "qpid/log/Statement.h" +#include "qpid/client/FailoverConnection.h" +#include "qpid/client/ConnectionSettings.h" + +#include <iostream> +#include <fstream> + +using namespace std; + + +namespace qpid { +namespace client { + + +FailoverConnection::FailoverConnection ( ) : + name(), + failoverCompleteTime(0) +{ + connection.registerFailureCallback + ( boost::bind(&FailoverConnection::failover, this)); +} + +FailoverConnection::~FailoverConnection () {} + +void +FailoverConnection::open ( const std::string& host, + int port, + const std::string& uid, + const std::string& pwd, + const std::string& virtualhost, + uint16_t maxFrameSize +) +{ + ConnectionSettings settings; + + settings.host = host; + settings.port = port; + settings.username = uid; + settings.username = uid; + settings.password = pwd; + settings.virtualhost = virtualhost; + settings.maxFrameSize = maxFrameSize; + settings.host = host; + + open ( settings ); +} + + +void +FailoverConnection::open ( ConnectionSettings & settings ) +{ + connection.open ( settings ); +} + + + +void +FailoverConnection::close ( ) +{ + connection.close(); +} + + + +FailoverSession * +FailoverConnection::newSession ( const std::string& /* name */ ) +{ + FailoverSession * fs = new FailoverSession; + sessions.push_back ( fs ); + fs->session = connection.newSession(); + return fs; +} + + + +void +FailoverConnection::resume ( FailoverSession & failoverSession ) +{ + connection.resume ( failoverSession.session ); +} + + +bool +FailoverConnection::isOpen() const +{ + return connection.isOpen(); +} + + +void +FailoverConnection::getKnownBrokers ( std::vector<std::string> & /*v*/ ) +{ +} + + +void +FailoverConnection::registerFailureCallback ( boost::function<void ()> /*fn*/ ) +{ +} + +void +FailoverConnection::failover ( ) +{ + std::vector<Url> knownBrokers = connection.getKnownBrokers(); + if (knownBrokers.empty()) + throw Exception(QPID_MSG("FailoverConnection::failover " << name << " no known brokers.")); + + Connection newConnection; + for (std::vector<Url>::iterator i = knownBrokers.begin(); i != knownBrokers.end(); ++i) { + try { + newConnection.open(*i); + break; + } + catch (const std::exception& e) { + QPID_LOG(info, "Could not fail-over to " << *i << ": " << e.what()); + if ((i + 1) == knownBrokers.end()) + throw; + } + } + + /* + * We have a valid new connection. Tell all the sessions + * (and, through them, their SessionManagers and whatever else) + * that we are about to failover to this new Connection. + */ + // FIXME mgoulish -- get rid of two-passes here. + std::vector<FailoverSession *>::iterator sessions_iterator; + + for ( sessions_iterator = sessions.begin(); + sessions_iterator < sessions.end(); + ++ sessions_iterator + ) + { + FailoverSession * fs = * sessions_iterator; + fs->prepareForFailover ( newConnection ); + } + + /* + * Tell all sessions to actually failover to the new connection. + */ + for ( sessions_iterator = sessions.begin(); + sessions_iterator < sessions.end(); + ++ sessions_iterator + ) + { + FailoverSession * fs = * sessions_iterator; + fs->failover ( ); + } + + connection = newConnection; + + // FIXME aconway 2008-10-09: use sys/Time.h functions. + if ( failoverCompleteTime ) + { + gettimeofday ( failoverCompleteTime, 0 ); + } +} + + + + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/FailoverConnection.h b/cpp/src/qpid/client/FailoverConnection.h new file mode 100644 index 0000000000..1cec0bfd5b --- /dev/null +++ b/cpp/src/qpid/client/FailoverConnection.h @@ -0,0 +1,103 @@ +#ifndef QPID_CLIENT_FAILOVERCONNECTION_H +#define QPID_CLIENT_FAILOVERCONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <string> + +#include "qpid/client/Connection.h" +#include "qpid/client/FailoverConnection.h" +#include "qpid/client/FailoverSession.h" +#include "qpid/client/FailoverSubscriptionManager.h" + + + +namespace qpid { +namespace client { + +class ConnectionSettings; + + +class FailoverConnection +{ + public: + + FailoverConnection ( ); + + ~FailoverConnection ( ); + + void open ( const std::string& host, + int port, + const std::string& uid = "guest", + const std::string& pwd = "guest", + const std::string& virtualhost = "/", + uint16_t maxFrameSize=65535 + ); + + void open ( ConnectionSettings & settings ); + + void close ( ); + + FailoverSession * newSession ( const std::string& name = std::string() ); + + void resume ( FailoverSession & session ); + + bool isOpen() const; + + void getKnownBrokers ( std::vector<std::string> & v ); + + + // public interface specific to Failover: + + void registerFailureCallback ( boost::function<void ()> fn ); + + // If you have more than 1 connection and you want to give them + // separate names for debugging... + std::string name; + + void failover ( ); + + struct timeval * failoverCompleteTime; + + + private: + + std::string host; + + Connection connection; + + int currentPortNumber; + + boost::function<void ()> clientFailoverCallback; + + std::vector<FailoverSession *> sessions; + + + friend class FailoverSession; + friend class FailoverSessionManager; +}; + +}} // namespace qpid::client + + +#endif /*!QPID_CLIENT_FAILOVERCONNECTION_H*/ diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp new file mode 100644 index 0000000000..1e20edde4a --- /dev/null +++ b/cpp/src/qpid/client/FailoverSession.cpp @@ -0,0 +1,592 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <iostream> +#include <fstream> + + +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" +#include "qpid/log/Statement.h" + +#include "qpid/client/FailoverConnection.h" +#include "qpid/client/FailoverSession.h" + + +using namespace std; + + +namespace qpid { +namespace client { + +FailoverSession::FailoverSession ( ) : + name("no_name") +{ + // The session is created by FailoverConnection::newSession + failoverSubscriptionManager = 0; +} + + +FailoverSession::~FailoverSession ( ) +{ +} + + +framing::FrameSet::shared_ptr +FailoverSession::get() +{ + return session.get(); +} + + +SessionId +FailoverSession::getId() const +{ + return session.getId(); +} + + +void +FailoverSession::close() +{ + session.close(); +} + + +void +FailoverSession::sync() +{ + session.sync(); +} + + +uint32_t +FailoverSession::timeout(uint32_t /*seconds*/ ) +{ + // MICK WTF? return session.timeout ( seconds ); + return 0; +} + + +Execution& +FailoverSession::getExecution() +{ + return session.getExecution(); +} + + +void +FailoverSession::flush() +{ + session.flush(); +} + + +void +FailoverSession::markCompleted(const framing::SequenceNumber& id, + bool cumulative, + bool notifyPeer + ) +{ + session.markCompleted ( id, cumulative, notifyPeer ); +} + + + +// Wrapped functions from Session ---------------------------- + +void +FailoverSession::executionSync() +{ + session.executionSync(); +} + + + +void +FailoverSession::executionResult ( const SequenceNumber& commandId, + const string& value + ) +{ + session.executionResult ( commandId, + value + ); +} + + + +void +FailoverSession::executionException ( uint16_t errorCode, + const SequenceNumber& commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t fieldIndex, + const string& description, + const FieldTable& errorInfo + ) +{ + session.executionException ( errorCode, + commandId, + classCode, + commandCode, + fieldIndex, + description, + errorInfo + ); +} + + + +void +FailoverSession::messageTransfer ( const string& destination, + uint8_t acceptMode, + uint8_t acquireMode, + const MethodContent& content + ) +{ + session.messageTransfer ( destination, + acceptMode, + acquireMode, + content + ); +} + + + +void +FailoverSession::messageAccept ( const SequenceSet& transfers ) +{ + session.messageAccept ( transfers ); +} + + + +void +FailoverSession::messageReject ( const SequenceSet& transfers, + uint16_t code, + const string& text + ) +{ + session.messageReject ( transfers, + code, + text + ); +} + + + +void +FailoverSession::messageRelease ( const SequenceSet& transfers, + bool setRedelivered + ) +{ + session.messageRelease ( transfers, + setRedelivered + ); +} + + + +qpid::framing::MessageAcquireResult +FailoverSession::messageAcquire ( const SequenceSet& transfers ) +{ + return session.messageAcquire ( transfers ); +} + + + +qpid::framing::MessageResumeResult +FailoverSession::messageResume ( const string& destination, + const string& resumeId + ) +{ + return session.messageResume ( destination, + resumeId + ); +} + + + +void +FailoverSession::messageSubscribe ( const string& queue, + const string& destination, + uint8_t acceptMode, + uint8_t acquireMode, + bool exclusive, + const string& resumeId, + uint64_t resumeTtl, + const FieldTable& arguments + ) +{ + session.messageSubscribe ( queue, + destination, + acceptMode, + acquireMode, + exclusive, + resumeId, + resumeTtl, + arguments + ); +} + + + +void +FailoverSession::messageCancel ( const string& destinations ) +{ + session.messageCancel ( destinations ); +} + + + +void +FailoverSession::messageSetFlowMode ( const string& destination, + uint8_t flowMode + ) +{ + session.messageSetFlowMode ( destination, + flowMode + ); +} + + + +void +FailoverSession::messageFlow(const string& destination, + uint8_t unit, + uint32_t value) +{ + session.messageFlow ( destination, + unit, + value + ); +} + + + +void +FailoverSession::messageFlush(const string& destination) +{ + session.messageFlush ( destination ); +} + + + +void +FailoverSession::messageStop(const string& destination) +{ + session.messageStop ( destination ); +} + + + +void +FailoverSession::txSelect() +{ + session.txSelect ( ); +} + + + +void +FailoverSession::txCommit() +{ + session.txCommit ( ); +} + + + +void +FailoverSession::txRollback() +{ + session.txRollback ( ); +} + + + +void +FailoverSession::dtxSelect() +{ + session.dtxSelect ( ); +} + + + +qpid::framing::XaResult +FailoverSession::dtxStart(const Xid& xid, + bool join, + bool resume) +{ + return session.dtxStart ( xid, + join, + resume + ); +} + + + +qpid::framing::XaResult +FailoverSession::dtxEnd(const Xid& xid, + bool fail, + bool suspend) +{ + return session.dtxEnd ( xid, + fail, + suspend + ); +} + + + +qpid::framing::XaResult +FailoverSession::dtxCommit(const Xid& xid, + bool onePhase) +{ + return session.dtxCommit ( xid, + onePhase + ); +} + + + +void +FailoverSession::dtxForget(const Xid& xid) +{ + session.dtxForget ( xid ); +} + + + +qpid::framing::DtxGetTimeoutResult +FailoverSession::dtxGetTimeout(const Xid& xid) +{ + return session.dtxGetTimeout ( xid ); +} + + + +qpid::framing::XaResult +FailoverSession::dtxPrepare(const Xid& xid) +{ + return session.dtxPrepare ( xid ); +} + + + +qpid::framing::DtxRecoverResult +FailoverSession::dtxRecover() +{ + return session.dtxRecover ( ); +} + + + +qpid::framing::XaResult +FailoverSession::dtxRollback(const Xid& xid) +{ + return session.dtxRollback ( xid ); +} + + + +void +FailoverSession::dtxSetTimeout(const Xid& xid, + uint32_t timeout) +{ + session.dtxSetTimeout ( xid, + timeout + ); +} + + + +void +FailoverSession::exchangeDeclare(const string& exchange, + const string& type, + const string& alternateExchange, + bool passive, + bool durable, + bool autoDelete, + const FieldTable& arguments) +{ + session.exchangeDeclare ( exchange, + type, + alternateExchange, + passive, + durable, + autoDelete, + arguments + ); +} + + + +void +FailoverSession::exchangeDelete(const string& exchange, + bool ifUnused) +{ + session.exchangeDelete ( exchange, + ifUnused + ); +} + + + +qpid::framing::ExchangeQueryResult +FailoverSession::exchangeQuery(const string& name) +{ + return session.exchangeQuery ( name ); +} + + + +void +FailoverSession::exchangeBind(const string& queue, + const string& exchange, + const string& bindingKey, + const FieldTable& arguments) +{ + session.exchangeBind ( queue, + exchange, + bindingKey, + arguments + ); +} + + + +void +FailoverSession::exchangeUnbind(const string& queue, + const string& exchange, + const string& bindingKey) +{ + session.exchangeUnbind ( queue, + exchange, + bindingKey + ); +} + + + +qpid::framing::ExchangeBoundResult +FailoverSession::exchangeBound(const string& exchange, + const string& queue, + const string& bindingKey, + const FieldTable& arguments) +{ + return session.exchangeBound ( exchange, + queue, + bindingKey, + arguments + ); +} + + + +void +FailoverSession::queueDeclare(const string& queue, + const string& alternateExchange, + bool passive, + bool durable, + bool exclusive, + bool autoDelete, + const FieldTable& arguments) +{ + session.queueDeclare ( queue, + alternateExchange, + passive, + durable, + exclusive, + autoDelete, + arguments + ); +} + + + +void +FailoverSession::queueDelete(const string& queue, + bool ifUnused, + bool ifEmpty) +{ + session.queueDelete ( queue, + ifUnused, + ifEmpty + ); +} + + + +void +FailoverSession::queuePurge(const string& queue) +{ + session.queuePurge ( queue) ; +} + + + +qpid::framing::QueueQueryResult +FailoverSession::queueQuery(const string& queue) +{ + return session.queueQuery ( queue ); +} + + +// end Wrapped functions from Session --------------------------- + + +// Get ready for a failover. +void +FailoverSession::prepareForFailover ( Connection newConnection ) +{ + try + { + newSession = newConnection.newSession(); + } + catch ( const std::exception& error ) + { + throw Exception(QPID_MSG("Can't create failover session.")); + } + + + if ( failoverSubscriptionManager ) + { + failoverSubscriptionManager->prepareForFailover ( newSession ); + } +} + + + +void +FailoverSession::failover ( ) +{ + if ( failoverSubscriptionManager ) + { + failoverSubscriptionManager->failover ( ); + } + + session = newSession; +} + + + + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/FailoverSession.h b/cpp/src/qpid/client/FailoverSession.h new file mode 100644 index 0000000000..713c72e460 --- /dev/null +++ b/cpp/src/qpid/client/FailoverSession.h @@ -0,0 +1,314 @@ +#ifndef QPID_CLIENT_FAILOVERSESSION_H +#define QPID_CLIENT_FAILOVERSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/Session.h" +#include "qpid/SessionId.h" +#include "qpid/framing/amqp_structs.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/framing/TransferContent.h" +#include "qpid/client/Completion.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Execution.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/TypedResult.h" +#include "qpid/shared_ptr.h" +#include <string> + + + + +namespace qpid { +namespace client { + + +class FailoverConnection; +class FailoverSubscriptionManager; + + +class FailoverSession +{ + public: + + typedef framing::TransferContent DefaultContent; + + FailoverSession ( ); + ~FailoverSession ( ); + + std::string name; + + framing::FrameSet::shared_ptr get(); + + SessionId getId() const; + + void close(); + + void sync(); + + uint32_t timeout ( uint32_t seconds); + + Execution& getExecution(); + + void flush(); + + void markCompleted(const framing::SequenceNumber& id, + bool cumulative, + bool notifyPeer + ); + + void sendCompletion ( ); + + + + // Wrapped functions from Session ---------------------------- + + void + executionSync(); + + + void + executionResult(const SequenceNumber& commandId=SequenceNumber(), + const string& value=string()); + + + void + executionException(uint16_t errorCode=0, + const SequenceNumber& commandId=SequenceNumber(), + uint8_t classCode=0, + uint8_t commandCode=0, + uint8_t fieldIndex=0, + const string& description=string(), + const FieldTable& errorInfo=FieldTable()); + + + void + messageTransfer(const string& destination=string(), + uint8_t acceptMode=1, + uint8_t acquireMode=0, + const MethodContent& content=DefaultContent(std::string())); + + + void + messageAccept(const SequenceSet& transfers=SequenceSet()); + + + void + messageReject(const SequenceSet& transfers=SequenceSet(), + uint16_t code=0, + const string& text=string()); + + + void + messageRelease(const SequenceSet& transfers=SequenceSet(), + bool setRedelivered=false); + + + qpid::framing::MessageAcquireResult + messageAcquire(const SequenceSet& transfers=SequenceSet()); + + + qpid::framing::MessageResumeResult + messageResume(const string& destination=string(), + const string& resumeId=string()); + + + void + messageSubscribe(const string& queue=string(), + const string& destination=string(), + uint8_t acceptMode=0, + uint8_t acquireMode=0, + bool exclusive=false, + const string& resumeId=string(), + uint64_t resumeTtl=0, + const FieldTable& arguments=FieldTable()); + + + void + messageCancel(const string& destination=string()); + + + void + messageSetFlowMode(const string& destination=string(), + uint8_t flowMode=0); + + + void + messageFlow(const string& destination=string(), + uint8_t unit=0, + uint32_t value=0); + + + void + messageFlush(const string& destination=string()); + + + void + messageStop(const string& destination=string()); + + + void + txSelect(); + + + void + txCommit(); + + + void + txRollback(); + + + void + dtxSelect(); + + + qpid::framing::XaResult + dtxStart(const Xid& xid=Xid(), + bool join=false, + bool resume=false); + + + qpid::framing::XaResult + dtxEnd(const Xid& xid=Xid(), + bool fail=false, + bool suspend=false); + + + qpid::framing::XaResult + dtxCommit(const Xid& xid=Xid(), + bool onePhase=false); + + + void + dtxForget(const Xid& xid=Xid()); + + + qpid::framing::DtxGetTimeoutResult + dtxGetTimeout(const Xid& xid=Xid()); + + + qpid::framing::XaResult + dtxPrepare(const Xid& xid=Xid()); + + + qpid::framing::DtxRecoverResult + dtxRecover(); + + + qpid::framing::XaResult + dtxRollback(const Xid& xid=Xid()); + + + void + dtxSetTimeout(const Xid& xid=Xid(), + uint32_t timeout=0); + + + void + exchangeDeclare(const string& exchange=string(), + const string& type=string(), + const string& alternateExchange=string(), + bool passive=false, + bool durable=false, + bool autoDelete=false, + const FieldTable& arguments=FieldTable()); + + + void + exchangeDelete(const string& exchange=string(), + bool ifUnused=false); + + + qpid::framing::ExchangeQueryResult + exchangeQuery(const string& name=string()); + + + void + exchangeBind(const string& queue=string(), + const string& exchange=string(), + const string& bindingKey=string(), + const FieldTable& arguments=FieldTable()); + + + void + exchangeUnbind(const string& queue=string(), + const string& exchange=string(), + const string& bindingKey=string()); + + + qpid::framing::ExchangeBoundResult + exchangeBound(const string& exchange=string(), + const string& queue=string(), + const string& bindingKey=string(), + const FieldTable& arguments=FieldTable()); + + + void + queueDeclare(const string& queue=string(), + const string& alternateExchange=string(), + bool passive=false, + bool durable=false, + bool exclusive=false, + bool autoDelete=false, + const FieldTable& arguments=FieldTable()); + + + void + queueDelete(const string& queue=string(), + bool ifUnused=false, + bool ifEmpty=false); + + + void + queuePurge(const string& queue=string()); + + + qpid::framing::QueueQueryResult + queueQuery(const string& queue=string()); + + // end Wrapped functions from Session --------------------------- + + // Tells the FailoverSession to get ready for a failover. + void prepareForFailover ( Connection newConnection ); + + void failover ( ); + + FailoverSubscriptionManager * failoverSubscriptionManager; + + + private: + + + Session session; + Session newSession; + + friend class FailoverConnection; + friend class FailoverSubscriptionManager; +}; + +}} // namespace qpid::client + + +#endif /*!QPID_CLIENT_FAILOVERSESSION_H*/ diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.cpp b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp new file mode 100644 index 0000000000..2b108c1303 --- /dev/null +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.cpp @@ -0,0 +1,332 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/FailoverSession.h" +#include "qpid/client/FailoverSubscriptionManager.h" + + + +using namespace std; + + +namespace qpid { +namespace client { + + + +FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * fs) : + name("no_name"), + newSessionIsValid(false) +{ + subscriptionManager = new SubscriptionManager(fs->session); + fs->failoverSubscriptionManager = this; +} + + + +void +FailoverSubscriptionManager::prepareForFailover ( Session _newSession ) +{ + newSession = _newSession; + newSessionIsValid = true; +} + + + +void +FailoverSubscriptionManager::failover ( ) +{ + subscriptionManager->stop(); + // TODO -- save vector of boost bind fns. +} + + + + +FailoverSubscriptionManager::subscribeArgs::subscribeArgs + ( int _interface, + MessageListener * _listener, + LocalQueue * _localQueue, + const std::string * _queue, + const FlowControl * _flow, + const std::string * _tag + ) : + interface(_interface), + listener(_listener), + localQueue(_localQueue), + queue(_queue), + flow(_flow), + tag(_tag) +{ +} + + + + +void +FailoverSubscriptionManager::subscribe ( MessageListener & listener, + const std::string & queue, + const FlowControl & flow, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( listener, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, flow, tag ) ); +} + + + +void +FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, + const std::string & queue, + const FlowControl & flow, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( localQueue, + queue, + flow, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const FlowControl&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag ) ); +} + + + +void +FailoverSubscriptionManager::subscribe ( MessageListener & listener, + const std::string & queue, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( listener, + queue, + tag + ); + // TODO -- more than one subscription + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, tag ) ); +} + + + + +void +FailoverSubscriptionManager::subscribe ( LocalQueue & localQueue, + const std::string & queue, + const std::string & tag + ) +{ + subscriptionManager->subscribe ( localQueue, + queue, + tag + ); + subscribeFns.push_back ( boost::bind ( (void (FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, queue, tag ) ); +} + + + +bool +FailoverSubscriptionManager::get ( Message & result, + const std::string & queue, + sys::Duration timeout + ) +{ + return subscriptionManager->get ( result, queue, timeout ); +} + + + +void +FailoverSubscriptionManager::cancel ( const std::string tag ) +{ + subscriptionManager->cancel ( tag ); +} + + + +void +FailoverSubscriptionManager::run ( ) // User Thread +{ + // FIXME mgoulish -- wait on a monitor here instead of this infinite loop + while ( 1 ) + { + subscriptionManager->run ( ); + + // When we drop out of run, if there is a new Session + // waiting for us, this is a failover. Otherwise, just + // return control to usercode. + sleep(1); // FIXME mgoulish -- get rid of this when we have wait-on-monitor. + + if ( newSessionIsValid ) + { + delete subscriptionManager; + subscriptionManager = new SubscriptionManager(newSession); + // FIXME mgoulish make this an array of boost bind fns + // + for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); + i < subscribeFns.end(); + ++ i + ) + { + std::cerr << "MDEBUG new new resubscribe.\n"; + (*i) (); + } + + newSessionIsValid = false; + } + else + { + // break; TODO -- fix this + } + } + +} + + + +void +FailoverSubscriptionManager::start ( ) +{ + subscriptionManager->start ( ); +} + + + +void +FailoverSubscriptionManager::setAutoStop ( bool set ) +{ + subscriptionManager->setAutoStop ( set ); +} + + + +void +FailoverSubscriptionManager::stop ( ) +{ + subscriptionManager->stop ( ); +} + + + +void +FailoverSubscriptionManager::setFlowControl ( const std::string & destination, + const FlowControl & flow + ) +{ + subscriptionManager->setFlowControl ( destination, flow ); +} + + + +void +FailoverSubscriptionManager::setFlowControl ( const FlowControl & flow ) +{ + subscriptionManager->setFlowControl ( flow ); +} + + + +const FlowControl & +FailoverSubscriptionManager::getFlowControl ( ) const +{ + return subscriptionManager->getFlowControl ( ); +} + + + + +void +FailoverSubscriptionManager::setFlowControl ( const std::string & tag, + uint32_t messages, + uint32_t bytes, + bool window + ) +{ + subscriptionManager->setFlowControl ( tag, + messages, + bytes, + window + ); +} + + + +void +FailoverSubscriptionManager::setFlowControl ( uint32_t messages, + uint32_t bytes, + bool window + ) +{ + subscriptionManager->setFlowControl ( messages, + bytes, + window + ); +} + + + +void +FailoverSubscriptionManager::setAcceptMode ( bool required ) +{ + subscriptionManager->setAcceptMode ( required ); +} + + + +void +FailoverSubscriptionManager::setAcquireMode ( bool acquire ) +{ + subscriptionManager->setAcquireMode ( acquire ); +} + + + +void +FailoverSubscriptionManager::setAckPolicy ( const AckPolicy & autoAck ) +{ + subscriptionManager->setAckPolicy ( autoAck ); +} + + + +AckPolicy & +FailoverSubscriptionManager::getAckPolicy() +{ + return subscriptionManager->getAckPolicy ( ); +} + + + +void +FailoverSubscriptionManager::registerFailoverHandler ( boost::function<void ()> /* fh */ ) +{ + // FIXME mgoulish -- get rid of this mechanism -- i think it's unused. +} + + + + + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/FailoverSubscriptionManager.h b/cpp/src/qpid/client/FailoverSubscriptionManager.h new file mode 100644 index 0000000000..fe96742876 --- /dev/null +++ b/cpp/src/qpid/client/FailoverSubscriptionManager.h @@ -0,0 +1,162 @@ +#ifndef QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H +#define QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "qpid/sys/Mutex.h" +#include <qpid/client/Dispatcher.h> +#include <qpid/client/Completion.h> +#include <qpid/client/Session.h> +#include <qpid/client/FailoverSession.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> +#include <qpid/client/LocalQueue.h> +#include <qpid/client/FlowControl.h> +#include <qpid/sys/Runnable.h> + + + + +namespace qpid { +namespace client { + + +class FailoverSubscriptionManager +{ + public: + + FailoverSubscriptionManager ( FailoverSession * fs ); + + void foo ( int& arg_1 ); + + void subscribe ( MessageListener & listener, + const std::string & queue, + const FlowControl & flow, + const std::string & tag = std::string() ); + + void subscribe ( LocalQueue & localQueue, + const std::string & queue, + const FlowControl & flow, + const std::string & tag=std::string()); + + void subscribe ( MessageListener & listener, + const std::string & queue, + const std::string & tag = std::string()); + + void subscribe ( LocalQueue & localQueue, + const std::string & queue, + const std::string & tag=std::string()); + + bool get ( Message & result, + const std::string & queue, + sys::Duration timeout=0); + + void cancel ( const std::string tag ); + + void run ( ); + + void start ( ); + + void setAutoStop ( bool set = true ); + + void stop ( ); + + void setFlowControl ( const std::string & destintion, + const FlowControl & flow ); + + void setFlowControl ( const FlowControl & flow ); + + const FlowControl & getFlowControl ( ) const; + + void setFlowControl ( const std::string & tag, + uint32_t messages, + uint32_t bytes, + bool window=true ); + + void setFlowControl ( uint32_t messages, + uint32_t bytes, + bool window = true + ); + + void setAcceptMode ( bool required ); + + void setAcquireMode ( bool acquire ); + + void setAckPolicy ( const AckPolicy & autoAck ); + + AckPolicy & getAckPolicy(); + + void registerFailoverHandler ( boost::function<void ()> fh ); + + // Get ready for a failover. + void prepareForFailover ( Session newSession ); + void failover ( ); + + std::string name; + + + private: + + SubscriptionManager * subscriptionManager; + + MessageListener * savedListener; + std::string savedQueue, + savedTag; + + friend class FailoverConnection; + friend class FailoverSession; + + Session newSession; + bool newSessionIsValid; + + /* + * */ + typedef boost::function<void ()> subscribeFn; + std::vector < subscribeFn > subscribeFns; + + struct subscribeArgs + { + int interface; + MessageListener * listener; + LocalQueue * localQueue; + const std::string * queue; + const FlowControl * flow; + const std::string * tag; + + subscribeArgs ( int _interface, + MessageListener *, + LocalQueue *, + const std::string *, + const FlowControl *, + const std::string * + ); + }; + + std::vector < subscribeArgs * > subscriptionReplayVector; + +}; + +}} // namespace qpid::client + + +#endif /*!QPID_CLIENT_FAILOVERSUBSCRIPTIONMANAGER_H*/ diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index e4d68e723f..dde93635c8 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -147,6 +147,12 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du return lq.get(result, 0); } +Session SubscriptionManager::getSession() const { return session; } + +void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) { + dispatcher.registerFailoverHandler(fh); +} + }} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index c50e67effa..10dd8b8da3 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -190,7 +190,7 @@ class SubscriptionManager : public sys::Runnable /** Set the acquire-mode for new subscriptions. Defaults to false. *@param acquire: if false messages pre-acquired, if true * messages are dequed on acknowledgement or on transfer - * depending on acceptMode. + * depending on acceptMode. */ void setAcquireMode(bool acquire); @@ -199,9 +199,11 @@ class SubscriptionManager : public sys::Runnable */ void setAckPolicy(const AckPolicy& autoAck); - AckPolicy& getAckPolicy(); + AckPolicy& getAckPolicy(); - Session getSession() const { return session; } + void registerFailoverHandler ( boost::function<void ()> fh ); + + Session getSession() const; }; /** AutoCancel cancels a subscription in its destructor */ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f4d75b7b6b..d63ad9646b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -110,6 +110,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // FIXME aconway 2008-09-24: // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } + broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); failoverExchange.reset(new FailoverExchange(this)); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); |