summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-11-04 13:59:44 +0000
committerGordon Sim <gsim@apache.org>2008-11-04 13:59:44 +0000
commitd273ece322f7abd39c7c62307abad0d89371f409 (patch)
tree747bc209e6625a0662a87a96a74a8bba1dd0265a
parent95a5154a508d0e64e888aa948b8e4cfe9c581fe9 (diff)
downloadqpid-python-d273ece322f7abd39c7c62307abad0d89371f409.tar.gz
Adding a couple of utilities (don't alter any existing functionality) that are useful in applications handling failover.
Added example of their use that I've been using in testing. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711256 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/examples/failover/Makefile.am21
-rw-r--r--cpp/examples/failover/replaying_sender.cpp97
-rw-r--r--cpp/examples/failover/resuming_receiver.cpp106
-rw-r--r--cpp/src/Makefile.am4
-rw-r--r--cpp/src/qpid/client/FailoverManager.cpp115
-rw-r--r--cpp/src/qpid/client/FailoverManager.h70
-rw-r--r--cpp/src/qpid/client/MessageReplayTracker.cpp81
-rw-r--r--cpp/src/qpid/client/MessageReplayTracker.h66
8 files changed, 554 insertions, 6 deletions
diff --git a/cpp/examples/failover/Makefile.am b/cpp/examples/failover/Makefile.am
index 8fe6b8cba7..36969dbd36 100644
--- a/cpp/examples/failover/Makefile.am
+++ b/cpp/examples/failover/Makefile.am
@@ -1,8 +1,9 @@
-examplesdir=$(pkgdatadir)/examples/direct
+examplesdir=$(pkgdatadir)/examples/failover
include $(top_srcdir)/examples/makedist.mk
-noinst_PROGRAMS=direct_producer listener declare_queues
+noinst_PROGRAMS=direct_producer listener declare_queues resuming_receiver replaying_sender
+
direct_producer_SOURCES=direct_producer.cpp
direct_producer_LDADD=$(CLIENT_LIB)
@@ -12,10 +13,18 @@ listener_LDADD=$(CLIENT_LIB)
declare_queues_SOURCES=declare_queues.cpp
declare_queues_LDADD=$(CLIENT_LIB)
-examples_DATA= \
- direct_producer.cpp \
- listener.cpp \
- declare_queues.cpp \
+resuming_receiver_SOURCES=resuming_receiver.cpp
+resuming_receiver_LDADD=$(CLIENT_LIB)
+
+replaying_sender_SOURCES=replaying_sender.cpp
+replaying_sender_LDADD=$(CLIENT_LIB)
+
+examples_DATA= \
+ direct_producer.cpp \
+ listener.cpp \
+ declare_queues.cpp \
+ resuming_receiver.cpp \
+ replaying_sender.cpp \
$(MAKEDIST)
# FIXME aconway 2008-10-10: add verify scripts.
diff --git a/cpp/examples/failover/replaying_sender.cpp b/cpp/examples/failover/replaying_sender.cpp
new file mode 100644
index 0000000000..22a7e1ebd3
--- /dev/null
+++ b/cpp/examples/failover/replaying_sender.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/MessageReplayTracker.h>
+#include <qpid/Exception.h>
+
+#include <iostream>
+#include <sstream>
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+class Sender : public FailoverManager::Command
+{
+ public:
+ Sender(const std::string& queue, uint count);
+ void execute(AsyncSession& session, bool isRetry);
+ uint getSent();
+ private:
+ MessageReplayTracker sender;
+ const uint count;
+ uint sent;
+ Message message;
+
+};
+
+Sender::Sender(const std::string& queue, uint count_) : sender(10), count(count_), sent(0)
+{
+ message.getDeliveryProperties().setRoutingKey(queue);
+}
+
+void Sender::execute(AsyncSession& session, bool isRetry)
+{
+ if (isRetry) sender.replay(session);
+ else sender.init(session);
+ while (sent < count) {
+ stringstream message_data;
+ message_data << ++sent;
+ message.setData(message_data.str());
+ message.getHeaders().setInt("sn", sent);
+ sender.send(message);
+ if (count > 1000 && !(sent % 1000)) {
+ std::cout << "sent " << sent << " of " << count << std::endl;
+ }
+ }
+ message.setData("That's all, folks!");
+ sender.send(message);
+}
+
+uint Sender::getSent()
+{
+ return sent;
+}
+
+int main(int argc, char ** argv)
+{
+ ConnectionSettings settings;
+ if (argc > 1) settings.host = argv[1];
+ if (argc > 2) settings.port = atoi(argv[2]);
+
+ FailoverManager connection(settings);
+ Sender sender("message_queue", argc > 3 ? atoi(argv[3]) : 1000);
+ try {
+ connection.execute(sender);
+ std::cout << "Sent " << sender.getSent() << " messages." << std::endl;
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << "Failed: " << error.what() << std::endl;
+ }
+ return 1;
+}
diff --git a/cpp/examples/failover/resuming_receiver.cpp b/cpp/examples/failover/resuming_receiver.cpp
new file mode 100644
index 0000000000..6ac67d8edb
--- /dev/null
+++ b/cpp/examples/failover/resuming_receiver.cpp
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+class Listener : public MessageListener, public FailoverManager::Command
+{
+ public:
+ Listener();
+ void received(Message& message);
+ void execute(AsyncSession& session, bool isRetry);
+ private:
+ Subscription subscription;
+ uint count;
+ uint skipped;
+ uint lastSn;
+};
+
+Listener::Listener() : count(0), skipped(0), lastSn(0) {}
+
+void Listener::received(Message & message)
+{
+ if (message.getData() == "That's all, folks!") {
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+
+ std::cout << "Listener received " << count << " messages (" << skipped << " skipped)" << std::endl;
+ subscription.cancel();
+ } else {
+ uint sn = message.getHeaders().getAsInt("sn");
+ if (lastSn < sn) {
+ if (sn - lastSn > 1) {
+ std::cout << "Warning: gap in sequence between " << lastSn << " and " << sn << std::endl;
+ }
+ lastSn = sn;
+ ++count;
+ } else {
+ ++skipped;
+ }
+ }
+}
+
+void Listener::execute(AsyncSession& session, bool isRetry)
+{
+ if (isRetry) {
+ std::cout << "Resuming from " << count << std::endl;
+ }
+ SubscriptionManager subs(session);
+ subscription = subs.subscribe(*this, "message_queue");
+ subs.run();
+}
+
+int main(int argc, char ** argv)
+{
+ ConnectionSettings settings;
+ if (argc > 1) settings.host = argv[1];
+ if (argc > 2) settings.port = atoi(argv[2]);
+
+ FailoverManager connection(settings);
+ Listener listener;
+
+ try {
+ connection.execute(listener);
+ connection.close();
+ std::cout << "Completed without error." << std::endl;
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << "Failure: " << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
+
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 24d05af2d8..2b0becf836 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -381,6 +381,7 @@ libqpidclient_la_SOURCES = \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
qpid/client/FailoverConnection.cpp \
+ qpid/client/FailoverManager.cpp \
qpid/client/FailoverSession.cpp \
qpid/client/FailoverSubscriptionManager.cpp \
qpid/client/FailoverListener.h \
@@ -393,6 +394,7 @@ libqpidclient_la_SOURCES = \
qpid/client/LocalQueue.cpp \
qpid/client/Message.cpp \
qpid/client/MessageListener.cpp \
+ qpid/client/MessageReplayTracker.cpp \
qpid/client/QueueOptions.cpp \
qpid/client/Results.cpp \
qpid/client/SessionBase_0_10.cpp \
@@ -519,6 +521,7 @@ nobase_include_HEADERS = \
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
qpid/client/FailoverConnection.h \
+ qpid/client/FailoverManager.h \
qpid/client/FailoverSession.h \
qpid/client/Subscription.h \
qpid/client/SubscriptionImpl.h \
@@ -533,6 +536,7 @@ nobase_include_HEADERS = \
qpid/client/QueueOptions.h \
qpid/client/Message.h \
qpid/client/MessageListener.h \
+ qpid/client/MessageReplayTracker.h \
qpid/client/Results.h \
qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
diff --git a/cpp/src/qpid/client/FailoverManager.cpp b/cpp/src/qpid/client/FailoverManager.cpp
new file mode 100644
index 0000000000..73c6bfc2de
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverManager.cpp
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverManager.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+
+
+namespace qpid {
+namespace client {
+
+using qpid::sys::Monitor;
+
+FailoverManager::FailoverManager(const ConnectionSettings& s) : settings(s), state(IDLE) {}
+
+void FailoverManager::execute(Command& c)
+{
+ bool retry = false;
+ bool completed = false;
+ while (!completed) {
+ try {
+ AsyncSession session = connect().newSession();
+ c.execute(session, retry);
+ session.sync();//TODO: shouldn't be required, but seems there is a bug in session
+ session.close();
+ completed = true;
+ } catch(const TransportFailure&) {
+ retry = true;
+ }
+ }
+}
+
+void FailoverManager::close()
+{
+ Monitor::ScopedLock l(lock);
+ connection.close();
+}
+
+Connection& FailoverManager::connect(std::vector<Url> brokers)
+{
+ Monitor::ScopedLock l(lock);
+ if (state == CANT_CONNECT) {
+ state = IDLE;//retry
+ }
+ while (!connection.isOpen()) {
+ if (state == CONNECTING) {
+ lock.wait();
+ } else if (state == CANT_CONNECT) {
+ throw CannotConnectException("Cannot establish a connection");
+ } else {
+ state = CONNECTING;
+ Connection c;
+ attempt(c, settings, brokers.empty() ? connection.getKnownBrokers() : brokers);
+ if (c.isOpen()) state = IDLE;
+ else state = CANT_CONNECT;
+ connection = c;
+ lock.notifyAll();
+ }
+ }
+ return connection;
+}
+
+Connection& FailoverManager::getConnection()
+{
+ Monitor::ScopedLock l(lock);
+ return connection;
+}
+
+void FailoverManager::attempt(Connection& c, ConnectionSettings s, std::vector<Url> urls)
+{
+ Monitor::ScopedUnlock u(lock);
+ if (urls.empty()) {
+ attempt(c, s);
+ } else {
+ for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end() && !c.isOpen(); ++i) {
+ for (Url::const_iterator j = i->begin(); j != i->end() && !c.isOpen(); ++j) {
+ const TcpAddress* tcp = j->get<TcpAddress>();
+ if (tcp) {
+ s.host = tcp->host;
+ s.port = tcp->port;
+ attempt(c, s);
+ }
+ }
+ }
+ }
+}
+
+void FailoverManager::attempt(Connection& c, ConnectionSettings s)
+{
+ try {
+ c.open(s);
+ } catch (const Exception& e) {
+ QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port << ": " << e.what());
+ }
+}
+
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverManager.h b/cpp/src/qpid/client/FailoverManager.h
new file mode 100644
index 0000000000..d17b8371d0
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverManager.h
@@ -0,0 +1,70 @@
+#ifndef QPID_CLIENT_FAILOVERMANAGER_H
+#define QPID_CLIENT_FAILOVERMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Connection.h"
+#include "ConnectionSettings.h"
+#include "qpid/Exception.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/sys/Monitor.h"
+#include <vector>
+
+namespace qpid {
+namespace client {
+
+struct CannotConnectException : qpid::Exception
+{
+ CannotConnectException(const std::string& m) : qpid::Exception(m) {}
+};
+
+/**
+ * Utility to handle reconnection.
+ */
+class FailoverManager
+{
+ public:
+ struct Command
+ {
+ virtual void execute(AsyncSession& session, bool isRetry) = 0;
+ virtual ~Command() {}
+ };
+
+ FailoverManager(const ConnectionSettings& settings);
+ Connection& connect(std::vector<Url> brokers = std::vector<Url>());
+ Connection& getConnection();
+ void close();
+ void execute(Command&);
+ private:
+ enum State {IDLE, CONNECTING, CANT_CONNECT};
+
+ qpid::sys::Monitor lock;
+ Connection connection;
+ ConnectionSettings settings;
+ State state;
+
+ void attempt(Connection&, ConnectionSettings settings, std::vector<Url> urls);
+ void attempt(Connection&, ConnectionSettings settings);
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FAILOVERMANAGER_H*/
diff --git a/cpp/src/qpid/client/MessageReplayTracker.cpp b/cpp/src/qpid/client/MessageReplayTracker.cpp
new file mode 100644
index 0000000000..3c36b03b34
--- /dev/null
+++ b/cpp/src/qpid/client/MessageReplayTracker.cpp
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageReplayTracker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace client {
+
+MessageReplayTracker::MessageReplayTracker(uint f) : flushInterval(f), count(0) {}
+
+void MessageReplayTracker::send(const Message& message, const std::string& destination)
+{
+ ReplayRecord record(message, destination);
+ record.send(*this);
+ if (flushInterval && ++count >= flushInterval) {
+ checkCompletion();
+ if (!buffer.empty()) session.flush();
+ }
+}
+void MessageReplayTracker::init(AsyncSession s)
+{
+ session = s;
+}
+
+void MessageReplayTracker::replay(AsyncSession s)
+{
+ session = s;
+ std::list<ReplayRecord> copy;
+ buffer.swap(copy);
+ std::for_each(copy.begin(), copy.end(), boost::bind(&ReplayRecord::send, _1, boost::ref(*this)));
+ session.flush();
+ count = 0;
+}
+
+void MessageReplayTracker::setFlushInterval(uint f)
+{
+ flushInterval = f;
+}
+
+uint MessageReplayTracker::getFlushInterval()
+{
+ return flushInterval;
+}
+
+void MessageReplayTracker::checkCompletion()
+{
+ buffer.remove_if(boost::bind(&ReplayRecord::isComplete, _1));
+}
+
+MessageReplayTracker::ReplayRecord::ReplayRecord(const Message& m, const std::string& d) : message(m), destination(d) {}
+
+void MessageReplayTracker::ReplayRecord::send(MessageReplayTracker& tracker)
+{
+ status = tracker.session.messageTransfer(arg::destination=destination, arg::content=message);
+ tracker.buffer.push_back(*this);
+}
+
+bool MessageReplayTracker::ReplayRecord::isComplete()
+{
+ return status.isComplete();
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/MessageReplayTracker.h b/cpp/src/qpid/client/MessageReplayTracker.h
new file mode 100644
index 0000000000..40324de4e9
--- /dev/null
+++ b/cpp/src/qpid/client/MessageReplayTracker.h
@@ -0,0 +1,66 @@
+#ifndef QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+#define QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AsyncSession.h"
+#include "Message.h"
+
+#include <list>
+#include <string>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Utility to track messages sent asynchronously, allowing those that
+ * are indoubt to be replayed over a new session.
+ */
+class MessageReplayTracker
+{
+ public:
+ MessageReplayTracker(uint flushInterval);
+ void send(const Message& message, const std::string& destination = "");
+ void init(AsyncSession session);
+ void replay(AsyncSession session);
+ void setFlushInterval(uint interval);
+ uint getFlushInterval();
+ void checkCompletion();
+ private:
+ struct ReplayRecord
+ {
+ Completion status;
+ Message message;
+ std::string destination;
+
+ ReplayRecord(const Message& message, const std::string& destination);
+ void send(MessageReplayTracker&);
+ bool isComplete();
+ };
+
+ AsyncSession session;
+ uint flushInterval;
+ uint count;
+ std::list<ReplayRecord> buffer;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_MESSAGEREPLAYTRACKER_H*/