diff options
Diffstat (limited to 'cpp/src/qpid/client/Dispatcher.cpp')
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 80 |
1 files changed, 36 insertions, 44 deletions
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 65756f6404..fd6a18b349 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -27,6 +27,8 @@ #include "qpid/sys/BlockingQueue.h" #include "Message.h" +#include <boost/state_saver.hpp> + using qpid::framing::FrameSet; using qpid::framing::MessageTransferBody; using qpid::sys::Mutex; @@ -36,23 +38,22 @@ using qpid::sys::Thread; namespace qpid { namespace client { - Subscriber::Subscriber(Session_0_10& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {} +Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {} void Subscriber::received(Message& msg) { if (listener) { listener->received(msg); - if (autoAck) { - bool send = (++count >= ackBatchSize); - msg.acknowledge(session, true, send); - if (send) count = 0; - } + autoAck.ack(msg); } } - - Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), queue(q), running(false), stopped(false) +Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) + : session(s), running(false) { + queue = q.empty() ? + session.execution().getDemux().getDefault() : + session.execution().getDemux().get(q); } void Dispatcher::start() @@ -62,19 +63,22 @@ void Dispatcher::start() void Dispatcher::run() { - Demux::QueuePtr q = queue.empty() ? - session.execution().getDemux().getDefault() : - session.execution().getDemux().get(queue); - - startRunning(); - stopped = false; - while (!isStopped()) { - FrameSet::shared_ptr content = q->pop(); + Mutex::ScopedLock l(lock); + if (running) + throw Exception("Dispatcher is already running."); + boost::state_saver<bool> reset(running); // Reset to false on exit. + running = true; + queue->open(); + while (!queue->isClosed()) { + Mutex::ScopedUnlock u(lock); + FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content); + Message msg(*content, session); Subscriber::shared_ptr listener = find(msg.getDestination()); if (!listener) { - QPID_LOG(error, "No message listener set: " << content->getMethod()); + // FIXME aconway 2007-11-07: Should close session & throw here? + QPID_LOG(error, "No message listener for " + << content->getMethod()); } else { listener->received(msg); } @@ -82,41 +86,23 @@ void Dispatcher::run() if (handler.get()) { handler->handle(*content); } else { + // FIXME aconway 2007-11-07: Should close session & throw here? QPID_LOG(error, "Unhandled method: " << content->getMethod()); } } } - stopRunning(); } void Dispatcher::stop() { ScopedLock<Mutex> l(lock); - stopped = true; -} - -bool Dispatcher::isStopped() -{ - ScopedLock<Mutex> l(lock); - return stopped; -} - -/** - * Prevent concurrent threads invoking run. - */ -void Dispatcher::startRunning() -{ - ScopedLock<Mutex> l(lock); - if (running) { - throw Exception("Dispatcher is already running."); - } - running = true; + queue->close(); // Will interrupt thread blocked in pop() } -void Dispatcher::stopRunning() +void Dispatcher::setAutoStop(bool b) { ScopedLock<Mutex> l(lock); - running = false; + autoStop = b; } Subscriber::shared_ptr Dispatcher::find(const std::string& name) @@ -129,22 +115,28 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name) return i->second; } -void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize) +void Dispatcher::listen( + MessageListener* listener, AckPolicy autoAck +) { ScopedLock<Mutex> l(lock); - defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize)); + defaultListener = Subscriber::shared_ptr( + new Subscriber(session, listener, autoAck)); } -void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize) +void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck) { ScopedLock<Mutex> l(lock); - listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize)); + listeners[destination] = Subscriber::shared_ptr( + new Subscriber(session, listener, autoAck)); } void Dispatcher::cancel(const std::string& destination) { ScopedLock<Mutex> l(lock); listeners.erase(destination); + if (autoStop && listeners.empty()) + queue->close(); } }} |