summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-11-04 13:59:44 +0000
committerGordon Sim <gsim@apache.org>2008-11-04 13:59:44 +0000
commitd273ece322f7abd39c7c62307abad0d89371f409 (patch)
tree747bc209e6625a0662a87a96a74a8bba1dd0265a /cpp/src
parent95a5154a508d0e64e888aa948b8e4cfe9c581fe9 (diff)
downloadqpid-python-d273ece322f7abd39c7c62307abad0d89371f409.tar.gz
Adding a couple of utilities (don't alter any existing functionality) that are useful in applications handling failover.
Added example of their use that I've been using in testing. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711256 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am4
-rw-r--r--cpp/src/qpid/client/FailoverManager.cpp115
-rw-r--r--cpp/src/qpid/client/FailoverManager.h70
-rw-r--r--cpp/src/qpid/client/MessageReplayTracker.cpp81
-rw-r--r--cpp/src/qpid/client/MessageReplayTracker.h66
5 files changed, 336 insertions, 0 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 24d05af2d8..2b0becf836 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -381,6 +381,7 @@ libqpidclient_la_SOURCES = \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
qpid/client/FailoverConnection.cpp \
+ qpid/client/FailoverManager.cpp \
qpid/client/FailoverSession.cpp \
qpid/client/FailoverSubscriptionManager.cpp \
qpid/client/FailoverListener.h \
@@ -393,6 +394,7 @@ libqpidclient_la_SOURCES = \
qpid/client/LocalQueue.cpp \
qpid/client/Message.cpp \
qpid/client/MessageListener.cpp \
+ qpid/client/MessageReplayTracker.cpp \
qpid/client/QueueOptions.cpp \
qpid/client/Results.cpp \
qpid/client/SessionBase_0_10.cpp \
@@ -519,6 +521,7 @@ nobase_include_HEADERS = \
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
qpid/client/FailoverConnection.h \
+ qpid/client/FailoverManager.h \
qpid/client/FailoverSession.h \
qpid/client/Subscription.h \
qpid/client/SubscriptionImpl.h \
@@ -533,6 +536,7 @@ nobase_include_HEADERS = \
qpid/client/QueueOptions.h \
qpid/client/Message.h \
qpid/client/MessageListener.h \
+ qpid/client/MessageReplayTracker.h \
qpid/client/Results.h \
qpid/client/SessionBase_0_10.h \
qpid/client/Session.h \
diff --git a/cpp/src/qpid/client/FailoverManager.cpp b/cpp/src/qpid/client/FailoverManager.cpp
new file mode 100644
index 0000000000..73c6bfc2de
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverManager.cpp
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverManager.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+
+
+namespace qpid {
+namespace client {
+
+using qpid::sys::Monitor;
+
+FailoverManager::FailoverManager(const ConnectionSettings& s) : settings(s), state(IDLE) {}
+
+void FailoverManager::execute(Command& c)
+{
+ bool retry = false;
+ bool completed = false;
+ while (!completed) {
+ try {
+ AsyncSession session = connect().newSession();
+ c.execute(session, retry);
+ session.sync();//TODO: shouldn't be required, but seems there is a bug in session
+ session.close();
+ completed = true;
+ } catch(const TransportFailure&) {
+ retry = true;
+ }
+ }
+}
+
+void FailoverManager::close()
+{
+ Monitor::ScopedLock l(lock);
+ connection.close();
+}
+
+Connection& FailoverManager::connect(std::vector<Url> brokers)
+{
+ Monitor::ScopedLock l(lock);
+ if (state == CANT_CONNECT) {
+ state = IDLE;//retry
+ }
+ while (!connection.isOpen()) {
+ if (state == CONNECTING) {
+ lock.wait();
+ } else if (state == CANT_CONNECT) {
+ throw CannotConnectException("Cannot establish a connection");
+ } else {
+ state = CONNECTING;
+ Connection c;
+ attempt(c, settings, brokers.empty() ? connection.getKnownBrokers() : brokers);
+ if (c.isOpen()) state = IDLE;
+ else state = CANT_CONNECT;
+ connection = c;
+ lock.notifyAll();
+ }
+ }
+ return connection;
+}
+
+Connection& FailoverManager::getConnection()
+{
+ Monitor::ScopedLock l(lock);
+ return connection;
+}
+
+void FailoverManager::attempt(Connection& c, ConnectionSettings s, std::vector<Url> urls)
+{
+ Monitor::ScopedUnlock u(lock);
+ if (urls.empty()) {
+ attempt(c, s);
+ } else {
+ for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end() && !c.isOpen(); ++i) {
+ for (Url::const_iterator j = i->begin(); j != i->end() && !c.isOpen(); ++j) {
+ const TcpAddress* tcp = j->get<TcpAddress>();
+ if (tcp) {
+ s.host = tcp->host;
+ s.port = tcp->port;
+ attempt(c, s);
+ }
+ }
+ }
+ }
+}
+
+void FailoverManager::attempt(Connection& c, ConnectionSettings s)
+{
+ try {
+ c.open(s);
+ } catch (const Exception& e) {
+ QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port << ": " << e.what());
+ }
+}
+
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverManager.h b/cpp/src/qpid/client/FailoverManager.h
new file mode 100644
index 0000000000..d17b8371d0
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverManager.h
@@ -0,0 +1,70 @@
+#ifndef QPID_CLIENT_FAILOVERMANAGER_H
+#define QPID_CLIENT_FAILOVERMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Connection.h"
+#include "ConnectionSettings.h"
+#include "qpid/Exception.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/sys/Monitor.h"
+#include <vector>
+
+namespace qpid {
+namespace client {
+
+struct CannotConnectException : qpid::Exception
+{
+ CannotConnectException(const std::string& m) : qpid::Exception(m) {}
+};
+
+/**
+ * Utility to handle reconnection.
+ */
+class FailoverManager
+{
+ public:
+ struct Command
+ {
+ virtual void execute(AsyncSession& session, bool isRetry) = 0;
+ virtual ~Command() {}
+ };
+
+ FailoverManager(const ConnectionSettings& settings);
+ Connection& connect(std::vector<Url> brokers = std::vector<Url>());
+ Connection& getConnection();
+ void close();
+ void execute(Command&);
+ private:
+ enum State {IDLE, CONNECTING, CANT_CONNECT};
+
+ qpid::sys::Monitor lock;
+ Connection connection;
+ ConnectionSettings settings;
+ State state;
+
+ void attempt(Connection&, ConnectionSettings settings, std::vector<Url> urls);
+ void attempt(Connection&, ConnectionSettings settings);
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FAILOVERMANAGER_H*/
diff --git a/cpp/src/qpid/client/MessageReplayTracker.cpp b/cpp/src/qpid/client/MessageReplayTracker.cpp
new file mode 100644
index 0000000000..3c36b03b34
--- /dev/null
+++ b/cpp/src/qpid/client/MessageReplayTracker.cpp
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageReplayTracker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace client {
+
+MessageReplayTracker::MessageReplayTracker(uint f) : flushInterval(f), count(0) {}
+
+void MessageReplayTracker::send(const Message& message, const std::string& destination)
+{
+ ReplayRecord record(message, destination);
+ record.send(*this);
+ if (flushInterval && ++count >= flushInterval) {
+ checkCompletion();
+ if (!buffer.empty()) session.flush();
+ }
+}
+void MessageReplayTracker::init(AsyncSession s)
+{
+ session = s;
+}
+
+void MessageReplayTracker::replay(AsyncSession s)
+{
+ session = s;
+ std::list<ReplayRecord> copy;
+ buffer.swap(copy);
+ std::for_each(copy.begin(), copy.end(), boost::bind(&ReplayRecord::send, _1, boost::ref(*this)));
+ session.flush();
+ count = 0;
+}
+
+void MessageReplayTracker::setFlushInterval(uint f)
+{
+ flushInterval = f;
+}
+
+uint MessageReplayTracker::getFlushInterval()
+{
+ return flushInterval;
+}
+
+void MessageReplayTracker::checkCompletion()
+{
+ buffer.remove_if(boost::bind(&ReplayRecord::isComplete, _1));
+}
+
+MessageReplayTracker::ReplayRecord::ReplayRecord(const Message& m, const std::string& d) : message(m), destination(d) {}
+
+void MessageReplayTracker::ReplayRecord::send(MessageReplayTracker& tracker)
+{
+ status = tracker.session.messageTransfer(arg::destination=destination, arg::content=message);
+ tracker.buffer.push_back(*this);
+}
+
+bool MessageReplayTracker::ReplayRecord::isComplete()
+{
+ return status.isComplete();
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/MessageReplayTracker.h b/cpp/src/qpid/client/MessageReplayTracker.h
new file mode 100644
index 0000000000..40324de4e9
--- /dev/null
+++ b/cpp/src/qpid/client/MessageReplayTracker.h
@@ -0,0 +1,66 @@
+#ifndef QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+#define QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AsyncSession.h"
+#include "Message.h"
+
+#include <list>
+#include <string>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Utility to track messages sent asynchronously, allowing those that
+ * are indoubt to be replayed over a new session.
+ */
+class MessageReplayTracker
+{
+ public:
+ MessageReplayTracker(uint flushInterval);
+ void send(const Message& message, const std::string& destination = "");
+ void init(AsyncSession session);
+ void replay(AsyncSession session);
+ void setFlushInterval(uint interval);
+ uint getFlushInterval();
+ void checkCompletion();
+ private:
+ struct ReplayRecord
+ {
+ Completion status;
+ Message message;
+ std::string destination;
+
+ ReplayRecord(const Message& message, const std::string& destination);
+ void send(MessageReplayTracker&);
+ bool isComplete();
+ };
+
+ AsyncSession session;
+ uint flushInterval;
+ uint count;
+ std::list<ReplayRecord> buffer;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_MESSAGEREPLAYTRACKER_H*/