summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Dispatcher.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-07 19:57:46 +0000
committerAlan Conway <aconway@apache.org>2007-11-07 19:57:46 +0000
commit710b8a1f1285b9aa5bccee5b1906500667dd7bc5 (patch)
tree83005778c44cf7d897cef882ced2330bc8bd2228 /cpp/src/qpid/client/Dispatcher.cpp
parentd19657d82321b2b5e2cac386c49aa99f82b976fb (diff)
downloadqpid-python-710b8a1f1285b9aa5bccee5b1906500667dd7bc5.tar.gz
client::SubscriptionManager:
- Added autoStop support. - Added LocalQueue subscriptions. - Expose AckPolicy settings to user. client::Message: - incoming Messages carry their session for acknowledge perftest: (see perftest --help for details...) - allow multiple consumers. - 3 queue modes: shared, fanout, topic. - set size of messages git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/Dispatcher.cpp')
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp80
1 files changed, 36 insertions, 44 deletions
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 65756f6404..fd6a18b349 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -27,6 +27,8 @@
#include "qpid/sys/BlockingQueue.h"
#include "Message.h"
+#include <boost/state_saver.hpp>
+
using qpid::framing::FrameSet;
using qpid::framing::MessageTransferBody;
using qpid::sys::Mutex;
@@ -36,23 +38,22 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
- Subscriber::Subscriber(Session_0_10& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {}
+Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
if (listener) {
listener->received(msg);
- if (autoAck) {
- bool send = (++count >= ackBatchSize);
- msg.acknowledge(session, true, send);
- if (send) count = 0;
- }
+ autoAck.ack(msg);
}
}
-
- Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), queue(q), running(false), stopped(false)
+Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
+ : session(s), running(false)
{
+ queue = q.empty() ?
+ session.execution().getDemux().getDefault() :
+ session.execution().getDemux().get(q);
}
void Dispatcher::start()
@@ -62,19 +63,22 @@ void Dispatcher::start()
void Dispatcher::run()
{
- Demux::QueuePtr q = queue.empty() ?
- session.execution().getDemux().getDefault() :
- session.execution().getDemux().get(queue);
-
- startRunning();
- stopped = false;
- while (!isStopped()) {
- FrameSet::shared_ptr content = q->pop();
+ Mutex::ScopedLock l(lock);
+ if (running)
+ throw Exception("Dispatcher is already running.");
+ boost::state_saver<bool> reset(running); // Reset to false on exit.
+ running = true;
+ queue->open();
+ while (!queue->isClosed()) {
+ Mutex::ScopedUnlock u(lock);
+ FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content);
+ Message msg(*content, session);
Subscriber::shared_ptr listener = find(msg.getDestination());
if (!listener) {
- QPID_LOG(error, "No message listener set: " << content->getMethod());
+ // FIXME aconway 2007-11-07: Should close session & throw here?
+ QPID_LOG(error, "No message listener for "
+ << content->getMethod());
} else {
listener->received(msg);
}
@@ -82,41 +86,23 @@ void Dispatcher::run()
if (handler.get()) {
handler->handle(*content);
} else {
+ // FIXME aconway 2007-11-07: Should close session & throw here?
QPID_LOG(error, "Unhandled method: " << content->getMethod());
}
}
}
- stopRunning();
}
void Dispatcher::stop()
{
ScopedLock<Mutex> l(lock);
- stopped = true;
-}
-
-bool Dispatcher::isStopped()
-{
- ScopedLock<Mutex> l(lock);
- return stopped;
-}
-
-/**
- * Prevent concurrent threads invoking run.
- */
-void Dispatcher::startRunning()
-{
- ScopedLock<Mutex> l(lock);
- if (running) {
- throw Exception("Dispatcher is already running.");
- }
- running = true;
+ queue->close(); // Will interrupt thread blocked in pop()
}
-void Dispatcher::stopRunning()
+void Dispatcher::setAutoStop(bool b)
{
ScopedLock<Mutex> l(lock);
- running = false;
+ autoStop = b;
}
Subscriber::shared_ptr Dispatcher::find(const std::string& name)
@@ -129,22 +115,28 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name)
return i->second;
}
-void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize)
+void Dispatcher::listen(
+ MessageListener* listener, AckPolicy autoAck
+)
{
ScopedLock<Mutex> l(lock);
- defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
+ defaultListener = Subscriber::shared_ptr(
+ new Subscriber(session, listener, autoAck));
}
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize)
+void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck)
{
ScopedLock<Mutex> l(lock);
- listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
+ listeners[destination] = Subscriber::shared_ptr(
+ new Subscriber(session, listener, autoAck));
}
void Dispatcher::cancel(const std::string& destination)
{
ScopedLock<Mutex> l(lock);
listeners.erase(destination);
+ if (autoStop && listeners.empty())
+ queue->close();
}
}}