diff options
Diffstat (limited to 'qpid/cpp/src/qmf/engine/ResilientConnection.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/engine/ResilientConnection.cpp | 514 |
1 files changed, 0 insertions, 514 deletions
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp deleted file mode 100644 index ab65b8d768..0000000000 --- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp +++ /dev/null @@ -1,514 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/ResilientConnection.h" -#include "qmf/engine/MessageImpl.h" -#include "qmf/engine/ConnectionSettingsImpl.h" -#include <qpid/client/Connection.h> -#include <qpid/client/Session.h> -#include <qpid/client/MessageListener.h> -#include <qpid/client/SubscriptionManager.h> -#include <qpid/client/Message.h> -#include <qpid/sys/Thread.h> -#include <qpid/sys/Runnable.h> -#include <qpid/sys/Mutex.h> -#include <qpid/sys/Condition.h> -#include <qpid/sys/Time.h> -#include <qpid/log/Statement.h> -#include <qpid/RefCounted.h> -#include <boost/bind.hpp> -#include <string> -#include <deque> -#include <vector> -#include <set> -#include <boost/intrusive_ptr.hpp> -#include <boost/noncopyable.hpp> -#include <unistd.h> -#include <fcntl.h> - -using namespace std; -using namespace qmf::engine; -using namespace qpid; -using qpid::sys::Mutex; - -namespace qmf { -namespace engine { - struct ResilientConnectionEventImpl { - ResilientConnectionEvent::EventKind kind; - void* sessionContext; - string errorText; - MessageImpl message; - - ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k, - const MessageImpl& m = MessageImpl()) : - kind(k), sessionContext(0), message(m) {} - ResilientConnectionEvent copy(); - }; - - struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted { - typedef boost::intrusive_ptr<RCSession> Ptr; - ResilientConnectionImpl& connImpl; - string name; - client::Connection& connection; - client::Session session; - client::SubscriptionManager* subscriptions; - string userId; - void* userContext; - vector<string> dests; - qpid::sys::Thread thread; - - RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc); - ~RCSession(); - void received(client::Message& msg); - void run(); - void stop(); - }; - - class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable { - public: - ResilientConnectionImpl(const ConnectionSettings& settings); - ~ResilientConnectionImpl(); - - bool isConnected() const; - bool getEvent(ResilientConnectionEvent& event); - void popEvent(); - bool createSession(const char* name, void* sessionContext, SessionHandle& handle); - void destroySession(SessionHandle handle); - void sendMessage(SessionHandle handle, qmf::engine::Message& message); - void declareQueue(SessionHandle handle, char* queue); - void deleteQueue(SessionHandle handle, char* queue); - void bind(SessionHandle handle, char* exchange, char* queue, char* key); - void unbind(SessionHandle handle, char* exchange, char* queue, char* key); - void setNotifyFd(int fd); - void notify(); - - void run(); - void failure(); - void sessionClosed(RCSession* sess); - - void EnqueueEvent(ResilientConnectionEvent::EventKind kind, - void* sessionContext = 0, - const MessageImpl& message = MessageImpl(), - const string& errorText = ""); - - private: - int notifyFd; - bool connected; - bool shutdown; - string lastError; - const ConnectionSettings settings; - client::Connection connection; - mutable qpid::sys::Mutex lock; - int delayMin; - int delayMax; - int delayFactor; - qpid::sys::Condition cond; - deque<ResilientConnectionEventImpl> eventQueue; - set<RCSession::Ptr> sessions; - qpid::sys::Thread connThread; - }; -} -} - -ResilientConnectionEvent ResilientConnectionEventImpl::copy() -{ - ResilientConnectionEvent item; - - ::memset(&item, 0, sizeof(ResilientConnectionEvent)); - item.kind = kind; - item.sessionContext = sessionContext; - item.message = message.copy(); - item.errorText = const_cast<char*>(errorText.c_str()); - - return item; -} - -RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) : - connImpl(ci), name(n), connection(c), session(connection.newSession(name)), - subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this) -{ - const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings(); - userId = operSettings.username; -} - -RCSession::~RCSession() -{ - subscriptions->stop(); - thread.join(); - session.close(); - delete subscriptions; -} - -void RCSession::run() -{ - try { - subscriptions->run(); - } catch (exception& /*e*/) { - connImpl.sessionClosed(this); - } -} - -void RCSession::stop() -{ - subscriptions->stop(); -} - -void RCSession::received(client::Message& msg) -{ - MessageImpl qmsg; - qmsg.body = msg.getData(); - - qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties(); - if (dp.hasRoutingKey()) { - qmsg.routingKey = dp.getRoutingKey(); - } - - qpid::framing::MessageProperties mp = msg.getMessageProperties(); - if (mp.hasReplyTo()) { - const qpid::framing::ReplyTo& rt = mp.getReplyTo(); - qmsg.replyExchange = rt.getExchange(); - qmsg.replyKey = rt.getRoutingKey(); - } - - if (mp.hasUserId()) { - qmsg.userId = mp.getUserId(); - } - - connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg); -} - -ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) : - notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this) -{ - connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this)); - settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor); -} - -ResilientConnectionImpl::~ResilientConnectionImpl() -{ - shutdown = true; - connected = false; - cond.notify(); - connThread.join(); - connection.close(); -} - -bool ResilientConnectionImpl::isConnected() const -{ - Mutex::ScopedLock _lock(lock); - return connected; -} - -bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event) -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front().copy(); - return true; -} - -void ResilientConnectionImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext, - SessionHandle& handle) -{ - Mutex::ScopedLock _lock(lock); - if (!connected) - return false; - - RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext)); - - handle.impl = (void*) sess.get(); - sessions.insert(sess); - - return true; -} - -void ResilientConnectionImpl::destroySession(SessionHandle handle) -{ - Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); - set<RCSession::Ptr>::iterator iter = sessions.find(sess); - if (iter != sessions.end()) { - for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++) - sess->subscriptions->cancel(dIter->c_str()); - sess->subscriptions->stop(); - sess->subscriptions->wait(); - - sessions.erase(iter); - return; - } -} - -void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message) -{ - Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); - set<RCSession::Ptr>::iterator iter = sessions.find(sess); - qpid::client::Message msg; - string data(message.body, message.length); - msg.getDeliveryProperties().setRoutingKey(message.routingKey); - msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey)); - if (settings.impl->getSendUserId()) - msg.getMessageProperties().setUserId(sess->userId); - msg.setData(data); - - try { - sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination); - } catch(exception& e) { - QPID_LOG(error, "Session Exception during message-transfer: " << e.what()); - sessions.erase(iter); - EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext); - } -} - -void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true); - sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE); - sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED); - sess->subscriptions->subscribe(*sess, queue, queue); - sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited()); - sess->dests.push_back(string(queue)); -} - -void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.queueDelete(client::arg::queue=queue); - for (vector<string>::iterator iter = sess->dests.begin(); - iter != sess->dests.end(); iter++) - if (*iter == queue) { - sess->subscriptions->cancel(queue); - sess->dests.erase(iter); - break; - } -} - -void ResilientConnectionImpl::bind(SessionHandle handle, - char* exchange, char* queue, char* key) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key); -} - -void ResilientConnectionImpl::unbind(SessionHandle handle, - char* exchange, char* queue, char* key) -{ - Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.impl; - - sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key); -} - -void ResilientConnectionImpl::notify() -{ - if (notifyFd != -1) - { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); - } -} - - -void ResilientConnectionImpl::setNotifyFd(int fd) -{ - notifyFd = fd; - if (notifyFd > 0) { - int original = fcntl(notifyFd, F_GETFL); - fcntl(notifyFd, F_SETFL, O_NONBLOCK | original); - } -} - -void ResilientConnectionImpl::run() -{ - int delay(delayMin); - - while (true) { - try { - QPID_LOG(trace, "Trying to open connection..."); - connection.open(settings.impl->getClientSettings()); - { - Mutex::ScopedLock _lock(lock); - connected = true; - EnqueueEvent(ResilientConnectionEvent::CONNECTED); - - while (connected) - cond.wait(lock); - delay = delayMin; - - while (!sessions.empty()) { - set<RCSession::Ptr>::iterator iter = sessions.begin(); - RCSession::Ptr sess = *iter; - sessions.erase(iter); - EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext); - Mutex::ScopedUnlock _u(lock); - sess->stop(); - - // Nullify the intrusive pointer within the scoped unlock, otherwise, - // the reference is held until overwritted above (under lock) which causes - // the session destructor to be called with the lock held. - sess = 0; - } - - EnqueueEvent(ResilientConnectionEvent::DISCONNECTED); - - if (shutdown) - return; - } - connection.close(); - } catch (exception &e) { - QPID_LOG(debug, "connection.open exception: " << e.what()); - Mutex::ScopedLock _lock(lock); - lastError = e.what(); - if (delay < delayMax) - delay *= delayFactor; - } - - ::qpid::sys::sleep(delay); - } -} - -void ResilientConnectionImpl::failure() -{ - Mutex::ScopedLock _lock(lock); - - connected = false; - lastError = "Closed by Peer"; - cond.notify(); -} - -void ResilientConnectionImpl::sessionClosed(RCSession*) -{ - Mutex::ScopedLock _lock(lock); - connected = false; - lastError = "Closed due to Session failure"; - cond.notify(); -} - -void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind, - void* sessionContext, - const MessageImpl& message, - const string& errorText) -{ - { - Mutex::ScopedLock _lock(lock); - ResilientConnectionEventImpl event(kind, message); - - event.sessionContext = sessionContext; - event.errorText = errorText; - - eventQueue.push_back(event); - } - - if (notifyFd != -1) - { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); - } -} - - -//================================================================== -// Wrappers -//================================================================== - -ResilientConnection::ResilientConnection(const ConnectionSettings& settings) -{ - impl = new ResilientConnectionImpl(settings); -} - -ResilientConnection::~ResilientConnection() -{ - delete impl; -} - -bool ResilientConnection::isConnected() const -{ - return impl->isConnected(); -} - -bool ResilientConnection::getEvent(ResilientConnectionEvent& event) -{ - return impl->getEvent(event); -} - -void ResilientConnection::popEvent() -{ - impl->popEvent(); -} - -bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle) -{ - return impl->createSession(name, sessionContext, handle); -} - -void ResilientConnection::destroySession(SessionHandle handle) -{ - impl->destroySession(handle); -} - -void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message) -{ - impl->sendMessage(handle, message); -} - -void ResilientConnection::declareQueue(SessionHandle handle, char* queue) -{ - impl->declareQueue(handle, queue); -} - -void ResilientConnection::deleteQueue(SessionHandle handle, char* queue) -{ - impl->deleteQueue(handle, queue); -} - -void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key) -{ - impl->bind(handle, exchange, queue, key); -} - -void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key) -{ - impl->unbind(handle, exchange, queue, key); -} - -void ResilientConnection::setNotifyFd(int fd) -{ - impl->setNotifyFd(fd); -} - -void ResilientConnection::notify() -{ - impl->notify(); -} - |