summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-21 23:35:23 +0000
committerAlan Conway <aconway@apache.org>2007-08-21 23:35:23 +0000
commitaf1fa9365f9166e129bbcc44c0706e41c9afa775 (patch)
treee351341d93e1f1ec1c843307c1e132ec856186b4 /qpid/cpp/src
parent05fe12bb0ba1bcdb7e608567f1f8baeaf3f3431b (diff)
downloadqpid-python-af1fa9365f9166e129bbcc44c0706e41c9afa775.tar.gz
* src/qpid/sys/Serializer.h, .cpp:
Template Serializer on functor for execute(). Old Serializer equivalent to Serializer<boost::function<void()> > * src/qpid/broker/BrokerQueue.h, .cpp: Use hand-written functor for Serializer instead of boost::function. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@568332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerQueue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerQueue.h10
-rw-r--r--qpid/cpp/src/qpid/sys/Serializer.cpp71
-rw-r--r--qpid/cpp/src/qpid/sys/Serializer.h130
-rw-r--r--qpid/cpp/src/tests/Serializer.cpp17
5 files changed, 128 insertions, 102 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
index 706179fb52..5ff9f950eb 100644
--- a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -50,7 +50,7 @@ Queue::Queue(const string& _name, bool _autodelete,
exclusive(0),
persistenceId(0),
serializer(false),
- dispatchCallback(boost::bind(&Queue::dispatch, this))
+ dispatchCallback(*this)
{
}
diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.h b/qpid/cpp/src/qpid/broker/BrokerQueue.h
index 35aa954c1e..962c11d8ee 100644
--- a/qpid/cpp/src/qpid/broker/BrokerQueue.h
+++ b/qpid/cpp/src/qpid/broker/BrokerQueue.h
@@ -60,6 +60,12 @@ namespace qpid {
typedef std::vector<Consumer*> Consumers;
typedef std::deque<Message::shared_ptr> Messages;
+ struct DispatchFunctor {
+ Queue& queue;
+ DispatchFunctor(Queue& q) : queue(q) {}
+ void operator()() { queue.dispatch(); }
+ };
+
const string name;
const bool autodelete;
MessageStore* const store;
@@ -75,8 +81,8 @@ namespace qpid {
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
boost::shared_ptr<Exchange> alternateExchange;
- qpid::sys::Serializer serializer;
- qpid::sys::Serializer::Task dispatchCallback;
+ qpid::sys::Serializer<DispatchFunctor> serializer;
+ DispatchFunctor dispatchCallback;
void pop();
void push(Message::shared_ptr& msg);
diff --git a/qpid/cpp/src/qpid/sys/Serializer.cpp b/qpid/cpp/src/qpid/sys/Serializer.cpp
index faf94a0f93..76dfaa6f6a 100644
--- a/qpid/cpp/src/qpid/sys/Serializer.cpp
+++ b/qpid/cpp/src/qpid/sys/Serializer.cpp
@@ -29,14 +29,14 @@
namespace qpid {
namespace sys {
-Serializer::Serializer(bool allowImmediate, Task notifyDispatchFn)
+SerializerBase::SerializerBase(bool allowImmediate, VoidFn0 notifyDispatchFn)
: state(IDLE), immediate(allowImmediate), notifyDispatch(notifyDispatchFn)
{
if (notifyDispatch.empty())
- notifyDispatch = boost::bind(&Serializer::notifyWorker, this);
+ notifyDispatch = boost::bind(&SerializerBase::notifyWorker, this);
}
-Serializer::~Serializer() {
+SerializerBase::~SerializerBase() {
{
Mutex::ScopedLock l(lock);
state = SHUTDOWN;
@@ -46,75 +46,14 @@ Serializer::~Serializer() {
worker.join();
}
-void Serializer::dispatch(Task& task) {
- Mutex::ScopedUnlock u(lock);
- // Preconditions: lock is held, state is EXECUTING or DISPATCHING
- assert(state != IDLE);
- assert(state != SHUTDOWN);
- assert(state == EXECUTING || state == DISPATCHING);
- try {
- task();
- } catch (const std::exception& e) {
- QPID_LOG(critical, "Unexpected exception in Serializer::dispatch"
- << e.what());
- assert(0); // Should not happen.
- } catch (...) {
- QPID_LOG(critical, "Unexpected exception in Serializer::dispatch.");
- assert(0); // Should not happen.
- }
-}
-
-void Serializer::execute(Task& task) {
- bool needNotify = false;
- {
- Mutex::ScopedLock l(lock);
- assert(state != SHUTDOWN);
- if (immediate && state == IDLE) {
- state = EXECUTING;
- dispatch(task);
- if (state != SHUTDOWN) {
- assert(state == EXECUTING);
- state = IDLE;
- }
- }
- else
- queue.push_back(task);
-
- if (!queue.empty() && state == IDLE) {
- state = DISPATCHING;
- needNotify = true;
- }
- }
- if (needNotify)
- notifyDispatch(); // Not my function, call outside lock.
-}
-
-void Serializer::dispatch() {
- Mutex::ScopedLock l(lock);
- // TODO aconway 2007-07-16: This loop could be unbounded
- // if other threads add work while we're in dispatch(Task&).
- // If we need to bound it we could dispatch just the elements
- // that were enqueued when dispatch() was first called - save
- // begin() iterator and pop only up to that.
- while (!queue.empty() && state != SHUTDOWN) {
- assert(state == DISPATCHING);
- dispatch(queue.front());
- queue.pop_front();
- }
- if (state != SHUTDOWN) {
- assert(state == DISPATCHING);
- state = IDLE;
- }
-}
-
-void Serializer::notifyWorker() {
+void SerializerBase::notifyWorker() {
if (!worker.id())
worker = Thread(*this);
else
lock.notify();
}
-void Serializer::run() {
+void SerializerBase::run() {
Mutex::ScopedLock l(lock);
while (state != SHUTDOWN) {
dispatch();
diff --git a/qpid/cpp/src/qpid/sys/Serializer.h b/qpid/cpp/src/qpid/sys/Serializer.h
index 337686cca0..085d51d7e2 100644
--- a/qpid/cpp/src/qpid/sys/Serializer.h
+++ b/qpid/cpp/src/qpid/sys/Serializer.h
@@ -36,29 +36,69 @@
namespace qpid {
namespace sys {
+/** Abstract base class for Serializer below. */
+class SerializerBase : private boost::noncopyable, private Runnable
+{
+ public:
+ typedef boost::function<void()> VoidFn0;
+ /** @see Serializer::Serializer */
+ SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0());
+
+ virtual ~SerializerBase();
+
+ virtual void dispatch() = 0;
+ protected:
+ enum State {
+ IDLE, ///< No threads are active.
+ EXECUTING, ///< execute() is executing a single task.
+ DISPATCHING, ///< dispatch() is draining the queue.
+ SHUTDOWN ///< SerailizerBase is being destroyed.
+ };
+
+ void notifyWorker();
+ void run();
+ virtual bool empty() = 0;
+
+ Monitor lock;
+ State state;
+ bool immediate;
+ Thread worker;
+ boost::function<void()> notifyDispatch;
+};
+
+
/**
* Execute tasks sequentially, queuing tasks when necessary to
* ensure only one thread at a time executes a task and tasks
* are executed in order.
+ *
+ * Task is a void returning 0-arg functor. It must not throw exceptions.
+ *
+ * Note we deliberately do not use boost::function as the task type
+ * because copying a boost::functor allocates the target object on the
+ * heap.
*/
-class Serializer : private boost::noncopyable, private Runnable
-{
- public:
- typedef boost::function<void()> Task;
+template <class Task>
+class Serializer : public SerializerBase {
+
+ std::deque<Task> queue;
+ bool empty() { return queue.empty(); }
+ void dispatch(Task& task);
+
+ public:
/** Start a serializer.
*
* @param notifyDispatch Called when work is pending and there is no
* active dispatch thread. Must arrange for dispatch() to be called
* in some thread other than the calling thread and return.
- * By default the Serializer supplies its own dispatch thread.
+ * By default the Serailizer supplies its own dispatch thread.
*
* @param immediate Allow execute() to execute a task immediatly
* in the current thread.
*/
- Serializer(bool immediate=true, Task notifyDispatch=Task());
-
- ~Serializer();
+ Serializer(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0())
+ : SerializerBase(immediate, notifyDispatch) {}
/**
* Task may be executed immediately in the calling thread if there
@@ -68,33 +108,73 @@ class Serializer : private boost::noncopyable, private Runnable
*/
void execute(Task& task);
+
/** Execute pending tasks sequentially in calling thread.
* Drains the task queue and returns, does not block for more tasks.
*
* @exception ShutdownException if the serializer is being destroyed.
*/
void dispatch();
-
- private:
- enum State {
- IDLE, ///< No threads are active.
- EXECUTING, ///< execute() is executing a single task.
- DISPATCHING, ///< dispatch() is draining the queue.
- SHUTDOWN ///< Serializer is being destroyed.
};
- void dispatch(Task&);
- void notifyWorker();
- void run();
- Monitor lock;
+template <class Task>
+void Serializer<Task>::execute(Task& task) {
+ bool needNotify = false;
+ {
+ Mutex::ScopedLock l(lock);
+ assert(state != SHUTDOWN);
+ if (immediate && state == IDLE) {
+ state = EXECUTING;
+ dispatch(task);
+ if (state != SHUTDOWN) {
+ assert(state == EXECUTING);
+ state = IDLE;
+ }
+ }
+ else
+ queue.push_back(task);
+ if (!queue.empty() && state == IDLE) {
+ state = DISPATCHING;
+ needNotify = true;
+ }
+ }
+ if (needNotify)
+ notifyDispatch(); // Not my function, call outside lock.
+}
+
+template <class Task>
+void Serializer<Task>::dispatch() {
+ Mutex::ScopedLock l(lock);
+ // TODO aconway 2007-07-16: This loop could be unbounded
+ // if other threads add work while we're in dispatch(Task&).
+ // If we need to bound it we could dispatch just the elements
+ // that were enqueued when dispatch() was first called - save
+ // begin() iterator and pop only up to that.
+ while (!queue.empty() && state != SHUTDOWN) {
+ assert(state == DISPATCHING);
+ dispatch(queue.front());
+ queue.pop_front();
+ }
+ if (state != SHUTDOWN) {
+ assert(state == DISPATCHING);
+ state = IDLE;
+ }
+}
+
+template <class Task>
+void Serializer<Task>::dispatch(Task& task) {
+ Mutex::ScopedUnlock u(lock);
+ // Preconditions: lock is held, state is EXECUTING or DISPATCHING
+ assert(state != IDLE);
+ assert(state != SHUTDOWN);
+ assert(state == EXECUTING || state == DISPATCHING);
+ // No exceptions allowed in task.
+ try { task(); } catch (...) { assert(0); }
+}
+
+
- State state;
- bool immediate;
- std::deque<Task> queue;
- Thread worker;
- Task notifyDispatch;
-};
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/tests/Serializer.cpp b/qpid/cpp/src/tests/Serializer.cpp
index d7345acf06..0135822275 100644
--- a/qpid/cpp/src/tests/Serializer.cpp
+++ b/qpid/cpp/src/tests/Serializer.cpp
@@ -38,6 +38,7 @@ using namespace qpid::sys;
using namespace qpid::framing;
using namespace std;
+typedef Serializer<boost::function<void()> > BoostFunctionSerializer;
/** Test for concurrent calls */
struct Tester {
@@ -61,7 +62,7 @@ struct Tester {
}
};
-void execute(Serializer& s, Serializer::Task t)
+void execute(BoostFunctionSerializer& s, boost::function<void()> t)
{
s.execute(t);
}
@@ -69,7 +70,7 @@ void execute(Serializer& s, Serializer::Task t)
BOOST_AUTO_TEST_CASE(testSingleThread) {
// Verify that we call in the same thread by default.
Tester tester;
- Serializer s;
+ BoostFunctionSerializer s;
for (int i = 0; i < 100; ++i)
execute(s, boost::bind(&Tester::test, &tester));
// All should be executed in this thread.
@@ -83,7 +84,7 @@ BOOST_AUTO_TEST_CASE(testSingleThread) {
BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) {
// Verify that we call in different thread if immediate=false.
Tester tester;
- Serializer s(false);
+ BoostFunctionSerializer s(false);
for (int i = 0; i < 100; ++i)
execute(s, boost::bind(&Tester::test, &tester));
{
@@ -99,13 +100,13 @@ BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) {
}
struct Caller : public Runnable, public Tester {
- Caller(Serializer& s) : serializer(s) {}
+ Caller(BoostFunctionSerializer& s) : serializer(s) {}
void run() { execute(serializer, boost::bind(&Tester::test, this)); }
- Serializer& serializer;
+ BoostFunctionSerializer& serializer;
};
BOOST_AUTO_TEST_CASE(testDispatchThread) {
- Serializer s;
+ BoostFunctionSerializer s;
Caller caller(s);
Thread threads[100];
// Concurrent calls.
@@ -121,7 +122,7 @@ BOOST_AUTO_TEST_CASE(testDispatchThread) {
}
-std::auto_ptr<Serializer> serializer;
+std::auto_ptr<BoostFunctionSerializer> serializer;
struct CallDispatch : public Runnable {
void run() {
@@ -136,7 +137,7 @@ void notifyDispatch() {
// Use externally created threads.
BOOST_AUTO_TEST_CASE(testExternalDispatch) {
- serializer.reset(new Serializer(false, &notifyDispatch));
+ serializer.reset(new BoostFunctionSerializer(false, &notifyDispatch));
Tester tester;
for (int i = 0; i < 100; ++i)
execute(*serializer, boost::bind(&Tester::test, &tester));