summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/engine/ResilientConnection.cpp')
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp514
1 files changed, 0 insertions, 514 deletions
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
deleted file mode 100644
index ab65b8d768..0000000000
--- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
+++ /dev/null
@@ -1,514 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ResilientConnection.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/ConnectionSettingsImpl.h"
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/SubscriptionManager.h>
-#include <qpid/client/Message.h>
-#include <qpid/sys/Thread.h>
-#include <qpid/sys/Runnable.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/sys/Condition.h>
-#include <qpid/sys/Time.h>
-#include <qpid/log/Statement.h>
-#include <qpid/RefCounted.h>
-#include <boost/bind.hpp>
-#include <string>
-#include <deque>
-#include <vector>
-#include <set>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/noncopyable.hpp>
-#include <unistd.h>
-#include <fcntl.h>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid;
-using qpid::sys::Mutex;
-
-namespace qmf {
-namespace engine {
- struct ResilientConnectionEventImpl {
- ResilientConnectionEvent::EventKind kind;
- void* sessionContext;
- string errorText;
- MessageImpl message;
-
- ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k,
- const MessageImpl& m = MessageImpl()) :
- kind(k), sessionContext(0), message(m) {}
- ResilientConnectionEvent copy();
- };
-
- struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
- typedef boost::intrusive_ptr<RCSession> Ptr;
- ResilientConnectionImpl& connImpl;
- string name;
- client::Connection& connection;
- client::Session session;
- client::SubscriptionManager* subscriptions;
- string userId;
- void* userContext;
- vector<string> dests;
- qpid::sys::Thread thread;
-
- RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc);
- ~RCSession();
- void received(client::Message& msg);
- void run();
- void stop();
- };
-
- class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable {
- public:
- ResilientConnectionImpl(const ConnectionSettings& settings);
- ~ResilientConnectionImpl();
-
- bool isConnected() const;
- bool getEvent(ResilientConnectionEvent& event);
- void popEvent();
- bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
- void destroySession(SessionHandle handle);
- void sendMessage(SessionHandle handle, qmf::engine::Message& message);
- void declareQueue(SessionHandle handle, char* queue);
- void deleteQueue(SessionHandle handle, char* queue);
- void bind(SessionHandle handle, char* exchange, char* queue, char* key);
- void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
- void setNotifyFd(int fd);
- void notify();
-
- void run();
- void failure();
- void sessionClosed(RCSession* sess);
-
- void EnqueueEvent(ResilientConnectionEvent::EventKind kind,
- void* sessionContext = 0,
- const MessageImpl& message = MessageImpl(),
- const string& errorText = "");
-
- private:
- int notifyFd;
- bool connected;
- bool shutdown;
- string lastError;
- const ConnectionSettings settings;
- client::Connection connection;
- mutable qpid::sys::Mutex lock;
- int delayMin;
- int delayMax;
- int delayFactor;
- qpid::sys::Condition cond;
- deque<ResilientConnectionEventImpl> eventQueue;
- set<RCSession::Ptr> sessions;
- qpid::sys::Thread connThread;
- };
-}
-}
-
-ResilientConnectionEvent ResilientConnectionEventImpl::copy()
-{
- ResilientConnectionEvent item;
-
- ::memset(&item, 0, sizeof(ResilientConnectionEvent));
- item.kind = kind;
- item.sessionContext = sessionContext;
- item.message = message.copy();
- item.errorText = const_cast<char*>(errorText.c_str());
-
- return item;
-}
-
-RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
- connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
- subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this)
-{
- const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings();
- userId = operSettings.username;
-}
-
-RCSession::~RCSession()
-{
- subscriptions->stop();
- thread.join();
- session.close();
- delete subscriptions;
-}
-
-void RCSession::run()
-{
- try {
- subscriptions->run();
- } catch (exception& /*e*/) {
- connImpl.sessionClosed(this);
- }
-}
-
-void RCSession::stop()
-{
- subscriptions->stop();
-}
-
-void RCSession::received(client::Message& msg)
-{
- MessageImpl qmsg;
- qmsg.body = msg.getData();
-
- qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
- if (dp.hasRoutingKey()) {
- qmsg.routingKey = dp.getRoutingKey();
- }
-
- qpid::framing::MessageProperties mp = msg.getMessageProperties();
- if (mp.hasReplyTo()) {
- const qpid::framing::ReplyTo& rt = mp.getReplyTo();
- qmsg.replyExchange = rt.getExchange();
- qmsg.replyKey = rt.getRoutingKey();
- }
-
- if (mp.hasUserId()) {
- qmsg.userId = mp.getUserId();
- }
-
- connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
-}
-
-ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
- notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this)
-{
- connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
- settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor);
-}
-
-ResilientConnectionImpl::~ResilientConnectionImpl()
-{
- shutdown = true;
- connected = false;
- cond.notify();
- connThread.join();
- connection.close();
-}
-
-bool ResilientConnectionImpl::isConnected() const
-{
- Mutex::ScopedLock _lock(lock);
- return connected;
-}
-
-bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event)
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front().copy();
- return true;
-}
-
-void ResilientConnectionImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext,
- SessionHandle& handle)
-{
- Mutex::ScopedLock _lock(lock);
- if (!connected)
- return false;
-
- RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
-
- handle.impl = (void*) sess.get();
- sessions.insert(sess);
-
- return true;
-}
-
-void ResilientConnectionImpl::destroySession(SessionHandle handle)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
- set<RCSession::Ptr>::iterator iter = sessions.find(sess);
- if (iter != sessions.end()) {
- for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
- sess->subscriptions->cancel(dIter->c_str());
- sess->subscriptions->stop();
- sess->subscriptions->wait();
-
- sessions.erase(iter);
- return;
- }
-}
-
-void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
- set<RCSession::Ptr>::iterator iter = sessions.find(sess);
- qpid::client::Message msg;
- string data(message.body, message.length);
- msg.getDeliveryProperties().setRoutingKey(message.routingKey);
- msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
- if (settings.impl->getSendUserId())
- msg.getMessageProperties().setUserId(sess->userId);
- msg.setData(data);
-
- try {
- sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination);
- } catch(exception& e) {
- QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
- sessions.erase(iter);
- EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext);
- }
-}
-
-void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true);
- sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE);
- sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED);
- sess->subscriptions->subscribe(*sess, queue, queue);
- sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited());
- sess->dests.push_back(string(queue));
-}
-
-void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.queueDelete(client::arg::queue=queue);
- for (vector<string>::iterator iter = sess->dests.begin();
- iter != sess->dests.end(); iter++)
- if (*iter == queue) {
- sess->subscriptions->cancel(queue);
- sess->dests.erase(iter);
- break;
- }
-}
-
-void ResilientConnectionImpl::bind(SessionHandle handle,
- char* exchange, char* queue, char* key)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
-}
-
-void ResilientConnectionImpl::unbind(SessionHandle handle,
- char* exchange, char* queue, char* key)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
-}
-
-void ResilientConnectionImpl::notify()
-{
- if (notifyFd != -1)
- {
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
- }
-}
-
-
-void ResilientConnectionImpl::setNotifyFd(int fd)
-{
- notifyFd = fd;
- if (notifyFd > 0) {
- int original = fcntl(notifyFd, F_GETFL);
- fcntl(notifyFd, F_SETFL, O_NONBLOCK | original);
- }
-}
-
-void ResilientConnectionImpl::run()
-{
- int delay(delayMin);
-
- while (true) {
- try {
- QPID_LOG(trace, "Trying to open connection...");
- connection.open(settings.impl->getClientSettings());
- {
- Mutex::ScopedLock _lock(lock);
- connected = true;
- EnqueueEvent(ResilientConnectionEvent::CONNECTED);
-
- while (connected)
- cond.wait(lock);
- delay = delayMin;
-
- while (!sessions.empty()) {
- set<RCSession::Ptr>::iterator iter = sessions.begin();
- RCSession::Ptr sess = *iter;
- sessions.erase(iter);
- EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
- Mutex::ScopedUnlock _u(lock);
- sess->stop();
-
- // Nullify the intrusive pointer within the scoped unlock, otherwise,
- // the reference is held until overwritted above (under lock) which causes
- // the session destructor to be called with the lock held.
- sess = 0;
- }
-
- EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
-
- if (shutdown)
- return;
- }
- connection.close();
- } catch (exception &e) {
- QPID_LOG(debug, "connection.open exception: " << e.what());
- Mutex::ScopedLock _lock(lock);
- lastError = e.what();
- if (delay < delayMax)
- delay *= delayFactor;
- }
-
- ::qpid::sys::sleep(delay);
- }
-}
-
-void ResilientConnectionImpl::failure()
-{
- Mutex::ScopedLock _lock(lock);
-
- connected = false;
- lastError = "Closed by Peer";
- cond.notify();
-}
-
-void ResilientConnectionImpl::sessionClosed(RCSession*)
-{
- Mutex::ScopedLock _lock(lock);
- connected = false;
- lastError = "Closed due to Session failure";
- cond.notify();
-}
-
-void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
- void* sessionContext,
- const MessageImpl& message,
- const string& errorText)
-{
- {
- Mutex::ScopedLock _lock(lock);
- ResilientConnectionEventImpl event(kind, message);
-
- event.sessionContext = sessionContext;
- event.errorText = errorText;
-
- eventQueue.push_back(event);
- }
-
- if (notifyFd != -1)
- {
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
- }
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
-{
- impl = new ResilientConnectionImpl(settings);
-}
-
-ResilientConnection::~ResilientConnection()
-{
- delete impl;
-}
-
-bool ResilientConnection::isConnected() const
-{
- return impl->isConnected();
-}
-
-bool ResilientConnection::getEvent(ResilientConnectionEvent& event)
-{
- return impl->getEvent(event);
-}
-
-void ResilientConnection::popEvent()
-{
- impl->popEvent();
-}
-
-bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle)
-{
- return impl->createSession(name, sessionContext, handle);
-}
-
-void ResilientConnection::destroySession(SessionHandle handle)
-{
- impl->destroySession(handle);
-}
-
-void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message)
-{
- impl->sendMessage(handle, message);
-}
-
-void ResilientConnection::declareQueue(SessionHandle handle, char* queue)
-{
- impl->declareQueue(handle, queue);
-}
-
-void ResilientConnection::deleteQueue(SessionHandle handle, char* queue)
-{
- impl->deleteQueue(handle, queue);
-}
-
-void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key)
-{
- impl->bind(handle, exchange, queue, key);
-}
-
-void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key)
-{
- impl->unbind(handle, exchange, queue, key);
-}
-
-void ResilientConnection::setNotifyFd(int fd)
-{
- impl->setNotifyFd(fd);
-}
-
-void ResilientConnection::notify()
-{
- impl->notify();
-}
-