diff options
Diffstat (limited to 'cpp/src/qpid/client/Dispatcher.cpp')
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 91 |
1 files changed, 48 insertions, 43 deletions
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 5028d68405..a715c623bf 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -18,15 +18,25 @@ * under the License. * */ -#include "Dispatcher.h" +#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionImpl.h" +#include "qpid/client/SessionImpl.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/BlockingQueue.h" -#include "Message.h" - -#include <boost/state_saver.hpp> +#include "qpid/client/Message.h" +#include "qpid/client/MessageImpl.h" + +#include <boost/version.hpp> +#if (BOOST_VERSION >= 104000) +# include <boost/serialization/state_saver.hpp> + using boost::serialization::state_saver; +#else +# include <boost/state_saver.hpp> + using boost::state_saver; +#endif /* BOOST_VERSION */ using qpid::framing::FrameSet; using qpid::framing::MessageTransferBody; @@ -37,44 +47,40 @@ using qpid::sys::Thread; namespace qpid { namespace client { -Subscriber::Subscriber(const Session& s, MessageListener* l, AckPolicy a) - : session(s), listener(l), autoAck(a) {} - -void Subscriber::received(Message& msg) -{ - if (listener) { - listener->received(msg); - autoAck.ack(msg, session); - } -} - Dispatcher::Dispatcher(const Session& s, const std::string& q) - : session(s), running(false), autoStop(true) + : session(s), + running(false), + autoStop(true), + failoverHandler(0) { - queue = q.empty() ? - session.getExecution().getDemux().getDefault() : - session.getExecution().getDemux().get(q); -} + Demux& demux = SessionBase_0_10Access(session).get()->getDemux(); + queue = q.empty() ? demux.getDefault() : demux.get(q); +} void Dispatcher::start() { worker = Thread(this); } +void Dispatcher::wait() +{ + worker.join(); +} + void Dispatcher::run() { Mutex::ScopedLock l(lock); if (running) throw Exception("Dispatcher is already running."); - boost::state_saver<bool> reset(running); // Reset to false on exit. + state_saver<bool> reset(running); // Reset to false on exit. running = true; try { while (!queue->isClosed()) { Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content); - Subscriber::shared_ptr listener = find(msg.getDestination()); + Message msg(new MessageImpl(*content)); + boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination()); if (!listener) { QPID_LOG(error, "No listener found for destination " << msg.getDestination()); } else { @@ -91,9 +97,21 @@ void Dispatcher::run() } session.sync(); // Make sure all our acks are received before returning. } - catch (const ClosedException&) {} //ignore it and return + catch (const ClosedException&) { + QPID_LOG(debug, QPID_MSG(session.getId() << ": closed by peer")); + } + catch (const TransportFailure&) { + QPID_LOG(info, QPID_MSG(session.getId() << ": transport failure")); + throw; + } catch (const std::exception& e) { - QPID_LOG(error, "Exception in client dispatch thread: " << e.what()); + if ( failoverHandler ) { + QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what())); + failoverHandler(); + } else { + QPID_LOG(error, session.getId() << " error: " << e.what()); + throw; + } } } @@ -109,7 +127,7 @@ void Dispatcher::setAutoStop(bool b) autoStop = b; } -Subscriber::shared_ptr Dispatcher::find(const std::string& name) +boost::intrusive_ptr<SubscriptionImpl> Dispatcher::find(const std::string& name) { ScopedLock<Mutex> l(lock); Listeners::iterator i = listeners.find(name); @@ -119,27 +137,14 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name) return i->second; } -void Dispatcher::listen( - MessageListener* listener, AckPolicy autoAck -) -{ +void Dispatcher::listen(const boost::intrusive_ptr<SubscriptionImpl>& subscription) { ScopedLock<Mutex> l(lock); - defaultListener = Subscriber::shared_ptr( - new Subscriber(session, listener, autoAck)); + listeners[subscription->getName()] = subscription; } -void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck) -{ - ScopedLock<Mutex> l(lock); - listeners[destination] = Subscriber::shared_ptr( - new Subscriber(session, listener, autoAck)); -} - -void Dispatcher::cancel(const std::string& destination) -{ +void Dispatcher::cancel(const std::string& destination) { ScopedLock<Mutex> l(lock); - listeners.erase(destination); - if (autoStop && listeners.empty()) + if (listeners.erase(destination) && running && autoStop && listeners.empty()) queue->close(); } |