summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/engine/ResilientConnection.cpp')
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp514
1 files changed, 514 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
new file mode 100644
index 0000000000..ab65b8d768
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/engine/ResilientConnection.h"
+#include "qmf/engine/MessageImpl.h"
+#include "qmf/engine/ConnectionSettingsImpl.h"
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/client/Message.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Condition.h>
+#include <qpid/sys/Time.h>
+#include <qpid/log/Statement.h>
+#include <qpid/RefCounted.h>
+#include <boost/bind.hpp>
+#include <string>
+#include <deque>
+#include <vector>
+#include <set>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <unistd.h>
+#include <fcntl.h>
+
+using namespace std;
+using namespace qmf::engine;
+using namespace qpid;
+using qpid::sys::Mutex;
+
+namespace qmf {
+namespace engine {
+ struct ResilientConnectionEventImpl {
+ ResilientConnectionEvent::EventKind kind;
+ void* sessionContext;
+ string errorText;
+ MessageImpl message;
+
+ ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k,
+ const MessageImpl& m = MessageImpl()) :
+ kind(k), sessionContext(0), message(m) {}
+ ResilientConnectionEvent copy();
+ };
+
+ struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
+ typedef boost::intrusive_ptr<RCSession> Ptr;
+ ResilientConnectionImpl& connImpl;
+ string name;
+ client::Connection& connection;
+ client::Session session;
+ client::SubscriptionManager* subscriptions;
+ string userId;
+ void* userContext;
+ vector<string> dests;
+ qpid::sys::Thread thread;
+
+ RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc);
+ ~RCSession();
+ void received(client::Message& msg);
+ void run();
+ void stop();
+ };
+
+ class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable {
+ public:
+ ResilientConnectionImpl(const ConnectionSettings& settings);
+ ~ResilientConnectionImpl();
+
+ bool isConnected() const;
+ bool getEvent(ResilientConnectionEvent& event);
+ void popEvent();
+ bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
+ void destroySession(SessionHandle handle);
+ void sendMessage(SessionHandle handle, qmf::engine::Message& message);
+ void declareQueue(SessionHandle handle, char* queue);
+ void deleteQueue(SessionHandle handle, char* queue);
+ void bind(SessionHandle handle, char* exchange, char* queue, char* key);
+ void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
+ void setNotifyFd(int fd);
+ void notify();
+
+ void run();
+ void failure();
+ void sessionClosed(RCSession* sess);
+
+ void EnqueueEvent(ResilientConnectionEvent::EventKind kind,
+ void* sessionContext = 0,
+ const MessageImpl& message = MessageImpl(),
+ const string& errorText = "");
+
+ private:
+ int notifyFd;
+ bool connected;
+ bool shutdown;
+ string lastError;
+ const ConnectionSettings settings;
+ client::Connection connection;
+ mutable qpid::sys::Mutex lock;
+ int delayMin;
+ int delayMax;
+ int delayFactor;
+ qpid::sys::Condition cond;
+ deque<ResilientConnectionEventImpl> eventQueue;
+ set<RCSession::Ptr> sessions;
+ qpid::sys::Thread connThread;
+ };
+}
+}
+
+ResilientConnectionEvent ResilientConnectionEventImpl::copy()
+{
+ ResilientConnectionEvent item;
+
+ ::memset(&item, 0, sizeof(ResilientConnectionEvent));
+ item.kind = kind;
+ item.sessionContext = sessionContext;
+ item.message = message.copy();
+ item.errorText = const_cast<char*>(errorText.c_str());
+
+ return item;
+}
+
+RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
+ connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
+ subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this)
+{
+ const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings();
+ userId = operSettings.username;
+}
+
+RCSession::~RCSession()
+{
+ subscriptions->stop();
+ thread.join();
+ session.close();
+ delete subscriptions;
+}
+
+void RCSession::run()
+{
+ try {
+ subscriptions->run();
+ } catch (exception& /*e*/) {
+ connImpl.sessionClosed(this);
+ }
+}
+
+void RCSession::stop()
+{
+ subscriptions->stop();
+}
+
+void RCSession::received(client::Message& msg)
+{
+ MessageImpl qmsg;
+ qmsg.body = msg.getData();
+
+ qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
+ if (dp.hasRoutingKey()) {
+ qmsg.routingKey = dp.getRoutingKey();
+ }
+
+ qpid::framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const qpid::framing::ReplyTo& rt = mp.getReplyTo();
+ qmsg.replyExchange = rt.getExchange();
+ qmsg.replyKey = rt.getRoutingKey();
+ }
+
+ if (mp.hasUserId()) {
+ qmsg.userId = mp.getUserId();
+ }
+
+ connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
+}
+
+ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
+ notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this)
+{
+ connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
+ settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor);
+}
+
+ResilientConnectionImpl::~ResilientConnectionImpl()
+{
+ shutdown = true;
+ connected = false;
+ cond.notify();
+ connThread.join();
+ connection.close();
+}
+
+bool ResilientConnectionImpl::isConnected() const
+{
+ Mutex::ScopedLock _lock(lock);
+ return connected;
+}
+
+bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (eventQueue.empty())
+ return false;
+ event = eventQueue.front().copy();
+ return true;
+}
+
+void ResilientConnectionImpl::popEvent()
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!eventQueue.empty())
+ eventQueue.pop_front();
+}
+
+bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext,
+ SessionHandle& handle)
+{
+ Mutex::ScopedLock _lock(lock);
+ if (!connected)
+ return false;
+
+ RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
+
+ handle.impl = (void*) sess.get();
+ sessions.insert(sess);
+
+ return true;
+}
+
+void ResilientConnectionImpl::destroySession(SessionHandle handle)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
+ set<RCSession::Ptr>::iterator iter = sessions.find(sess);
+ if (iter != sessions.end()) {
+ for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
+ sess->subscriptions->cancel(dIter->c_str());
+ sess->subscriptions->stop();
+ sess->subscriptions->wait();
+
+ sessions.erase(iter);
+ return;
+ }
+}
+
+void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
+ set<RCSession::Ptr>::iterator iter = sessions.find(sess);
+ qpid::client::Message msg;
+ string data(message.body, message.length);
+ msg.getDeliveryProperties().setRoutingKey(message.routingKey);
+ msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
+ if (settings.impl->getSendUserId())
+ msg.getMessageProperties().setUserId(sess->userId);
+ msg.setData(data);
+
+ try {
+ sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination);
+ } catch(exception& e) {
+ QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
+ sessions.erase(iter);
+ EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext);
+ }
+}
+
+void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true);
+ sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE);
+ sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED);
+ sess->subscriptions->subscribe(*sess, queue, queue);
+ sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited());
+ sess->dests.push_back(string(queue));
+}
+
+void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.queueDelete(client::arg::queue=queue);
+ for (vector<string>::iterator iter = sess->dests.begin();
+ iter != sess->dests.end(); iter++)
+ if (*iter == queue) {
+ sess->subscriptions->cancel(queue);
+ sess->dests.erase(iter);
+ break;
+ }
+}
+
+void ResilientConnectionImpl::bind(SessionHandle handle,
+ char* exchange, char* queue, char* key)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
+}
+
+void ResilientConnectionImpl::unbind(SessionHandle handle,
+ char* exchange, char* queue, char* key)
+{
+ Mutex::ScopedLock _lock(lock);
+ RCSession* sess = (RCSession*) handle.impl;
+
+ sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
+}
+
+void ResilientConnectionImpl::notify()
+{
+ if (notifyFd != -1)
+ {
+ int unused_ret; //Suppress warnings about ignoring return value.
+ unused_ret = ::write(notifyFd, ".", 1);
+ }
+}
+
+
+void ResilientConnectionImpl::setNotifyFd(int fd)
+{
+ notifyFd = fd;
+ if (notifyFd > 0) {
+ int original = fcntl(notifyFd, F_GETFL);
+ fcntl(notifyFd, F_SETFL, O_NONBLOCK | original);
+ }
+}
+
+void ResilientConnectionImpl::run()
+{
+ int delay(delayMin);
+
+ while (true) {
+ try {
+ QPID_LOG(trace, "Trying to open connection...");
+ connection.open(settings.impl->getClientSettings());
+ {
+ Mutex::ScopedLock _lock(lock);
+ connected = true;
+ EnqueueEvent(ResilientConnectionEvent::CONNECTED);
+
+ while (connected)
+ cond.wait(lock);
+ delay = delayMin;
+
+ while (!sessions.empty()) {
+ set<RCSession::Ptr>::iterator iter = sessions.begin();
+ RCSession::Ptr sess = *iter;
+ sessions.erase(iter);
+ EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
+ Mutex::ScopedUnlock _u(lock);
+ sess->stop();
+
+ // Nullify the intrusive pointer within the scoped unlock, otherwise,
+ // the reference is held until overwritted above (under lock) which causes
+ // the session destructor to be called with the lock held.
+ sess = 0;
+ }
+
+ EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
+
+ if (shutdown)
+ return;
+ }
+ connection.close();
+ } catch (exception &e) {
+ QPID_LOG(debug, "connection.open exception: " << e.what());
+ Mutex::ScopedLock _lock(lock);
+ lastError = e.what();
+ if (delay < delayMax)
+ delay *= delayFactor;
+ }
+
+ ::qpid::sys::sleep(delay);
+ }
+}
+
+void ResilientConnectionImpl::failure()
+{
+ Mutex::ScopedLock _lock(lock);
+
+ connected = false;
+ lastError = "Closed by Peer";
+ cond.notify();
+}
+
+void ResilientConnectionImpl::sessionClosed(RCSession*)
+{
+ Mutex::ScopedLock _lock(lock);
+ connected = false;
+ lastError = "Closed due to Session failure";
+ cond.notify();
+}
+
+void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
+ void* sessionContext,
+ const MessageImpl& message,
+ const string& errorText)
+{
+ {
+ Mutex::ScopedLock _lock(lock);
+ ResilientConnectionEventImpl event(kind, message);
+
+ event.sessionContext = sessionContext;
+ event.errorText = errorText;
+
+ eventQueue.push_back(event);
+ }
+
+ if (notifyFd != -1)
+ {
+ int unused_ret; //Suppress warnings about ignoring return value.
+ unused_ret = ::write(notifyFd, ".", 1);
+ }
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
+{
+ impl = new ResilientConnectionImpl(settings);
+}
+
+ResilientConnection::~ResilientConnection()
+{
+ delete impl;
+}
+
+bool ResilientConnection::isConnected() const
+{
+ return impl->isConnected();
+}
+
+bool ResilientConnection::getEvent(ResilientConnectionEvent& event)
+{
+ return impl->getEvent(event);
+}
+
+void ResilientConnection::popEvent()
+{
+ impl->popEvent();
+}
+
+bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle)
+{
+ return impl->createSession(name, sessionContext, handle);
+}
+
+void ResilientConnection::destroySession(SessionHandle handle)
+{
+ impl->destroySession(handle);
+}
+
+void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message)
+{
+ impl->sendMessage(handle, message);
+}
+
+void ResilientConnection::declareQueue(SessionHandle handle, char* queue)
+{
+ impl->declareQueue(handle, queue);
+}
+
+void ResilientConnection::deleteQueue(SessionHandle handle, char* queue)
+{
+ impl->deleteQueue(handle, queue);
+}
+
+void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key)
+{
+ impl->bind(handle, exchange, queue, key);
+}
+
+void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key)
+{
+ impl->unbind(handle, exchange, queue, key);
+}
+
+void ResilientConnection::setNotifyFd(int fd)
+{
+ impl->setNotifyFd(fd);
+}
+
+void ResilientConnection::notify()
+{
+ impl->notify();
+}
+