diff options
author | Alan Conway <aconway@apache.org> | 2007-08-21 23:35:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-08-21 23:35:23 +0000 |
commit | af1fa9365f9166e129bbcc44c0706e41c9afa775 (patch) | |
tree | e351341d93e1f1ec1c843307c1e132ec856186b4 /qpid/cpp/src | |
parent | 05fe12bb0ba1bcdb7e608567f1f8baeaf3f3431b (diff) | |
download | qpid-python-af1fa9365f9166e129bbcc44c0706e41c9afa775.tar.gz |
* src/qpid/sys/Serializer.h, .cpp:
Template Serializer on functor for execute().
Old Serializer equivalent to Serializer<boost::function<void()> >
* src/qpid/broker/BrokerQueue.h, .cpp:
Use hand-written functor for Serializer instead of boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@568332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerQueue.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerQueue.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Serializer.cpp | 71 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Serializer.h | 130 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Serializer.cpp | 17 |
5 files changed, 128 insertions, 102 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp index 706179fb52..5ff9f950eb 100644 --- a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp @@ -50,7 +50,7 @@ Queue::Queue(const string& _name, bool _autodelete, exclusive(0), persistenceId(0), serializer(false), - dispatchCallback(boost::bind(&Queue::dispatch, this)) + dispatchCallback(*this) { } diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.h b/qpid/cpp/src/qpid/broker/BrokerQueue.h index 35aa954c1e..962c11d8ee 100644 --- a/qpid/cpp/src/qpid/broker/BrokerQueue.h +++ b/qpid/cpp/src/qpid/broker/BrokerQueue.h @@ -60,6 +60,12 @@ namespace qpid { typedef std::vector<Consumer*> Consumers; typedef std::deque<Message::shared_ptr> Messages; + struct DispatchFunctor { + Queue& queue; + DispatchFunctor(Queue& q) : queue(q) {} + void operator()() { queue.dispatch(); } + }; + const string name; const bool autodelete; MessageStore* const store; @@ -75,8 +81,8 @@ namespace qpid { std::auto_ptr<QueuePolicy> policy; QueueBindings bindings; boost::shared_ptr<Exchange> alternateExchange; - qpid::sys::Serializer serializer; - qpid::sys::Serializer::Task dispatchCallback; + qpid::sys::Serializer<DispatchFunctor> serializer; + DispatchFunctor dispatchCallback; void pop(); void push(Message::shared_ptr& msg); diff --git a/qpid/cpp/src/qpid/sys/Serializer.cpp b/qpid/cpp/src/qpid/sys/Serializer.cpp index faf94a0f93..76dfaa6f6a 100644 --- a/qpid/cpp/src/qpid/sys/Serializer.cpp +++ b/qpid/cpp/src/qpid/sys/Serializer.cpp @@ -29,14 +29,14 @@ namespace qpid { namespace sys { -Serializer::Serializer(bool allowImmediate, Task notifyDispatchFn) +SerializerBase::SerializerBase(bool allowImmediate, VoidFn0 notifyDispatchFn) : state(IDLE), immediate(allowImmediate), notifyDispatch(notifyDispatchFn) { if (notifyDispatch.empty()) - notifyDispatch = boost::bind(&Serializer::notifyWorker, this); + notifyDispatch = boost::bind(&SerializerBase::notifyWorker, this); } -Serializer::~Serializer() { +SerializerBase::~SerializerBase() { { Mutex::ScopedLock l(lock); state = SHUTDOWN; @@ -46,75 +46,14 @@ Serializer::~Serializer() { worker.join(); } -void Serializer::dispatch(Task& task) { - Mutex::ScopedUnlock u(lock); - // Preconditions: lock is held, state is EXECUTING or DISPATCHING - assert(state != IDLE); - assert(state != SHUTDOWN); - assert(state == EXECUTING || state == DISPATCHING); - try { - task(); - } catch (const std::exception& e) { - QPID_LOG(critical, "Unexpected exception in Serializer::dispatch" - << e.what()); - assert(0); // Should not happen. - } catch (...) { - QPID_LOG(critical, "Unexpected exception in Serializer::dispatch."); - assert(0); // Should not happen. - } -} - -void Serializer::execute(Task& task) { - bool needNotify = false; - { - Mutex::ScopedLock l(lock); - assert(state != SHUTDOWN); - if (immediate && state == IDLE) { - state = EXECUTING; - dispatch(task); - if (state != SHUTDOWN) { - assert(state == EXECUTING); - state = IDLE; - } - } - else - queue.push_back(task); - - if (!queue.empty() && state == IDLE) { - state = DISPATCHING; - needNotify = true; - } - } - if (needNotify) - notifyDispatch(); // Not my function, call outside lock. -} - -void Serializer::dispatch() { - Mutex::ScopedLock l(lock); - // TODO aconway 2007-07-16: This loop could be unbounded - // if other threads add work while we're in dispatch(Task&). - // If we need to bound it we could dispatch just the elements - // that were enqueued when dispatch() was first called - save - // begin() iterator and pop only up to that. - while (!queue.empty() && state != SHUTDOWN) { - assert(state == DISPATCHING); - dispatch(queue.front()); - queue.pop_front(); - } - if (state != SHUTDOWN) { - assert(state == DISPATCHING); - state = IDLE; - } -} - -void Serializer::notifyWorker() { +void SerializerBase::notifyWorker() { if (!worker.id()) worker = Thread(*this); else lock.notify(); } -void Serializer::run() { +void SerializerBase::run() { Mutex::ScopedLock l(lock); while (state != SHUTDOWN) { dispatch(); diff --git a/qpid/cpp/src/qpid/sys/Serializer.h b/qpid/cpp/src/qpid/sys/Serializer.h index 337686cca0..085d51d7e2 100644 --- a/qpid/cpp/src/qpid/sys/Serializer.h +++ b/qpid/cpp/src/qpid/sys/Serializer.h @@ -36,29 +36,69 @@ namespace qpid { namespace sys { +/** Abstract base class for Serializer below. */ +class SerializerBase : private boost::noncopyable, private Runnable +{ + public: + typedef boost::function<void()> VoidFn0; + /** @see Serializer::Serializer */ + SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()); + + virtual ~SerializerBase(); + + virtual void dispatch() = 0; + protected: + enum State { + IDLE, ///< No threads are active. + EXECUTING, ///< execute() is executing a single task. + DISPATCHING, ///< dispatch() is draining the queue. + SHUTDOWN ///< SerailizerBase is being destroyed. + }; + + void notifyWorker(); + void run(); + virtual bool empty() = 0; + + Monitor lock; + State state; + bool immediate; + Thread worker; + boost::function<void()> notifyDispatch; +}; + + /** * Execute tasks sequentially, queuing tasks when necessary to * ensure only one thread at a time executes a task and tasks * are executed in order. + * + * Task is a void returning 0-arg functor. It must not throw exceptions. + * + * Note we deliberately do not use boost::function as the task type + * because copying a boost::functor allocates the target object on the + * heap. */ -class Serializer : private boost::noncopyable, private Runnable -{ - public: - typedef boost::function<void()> Task; +template <class Task> +class Serializer : public SerializerBase { + + std::deque<Task> queue; + bool empty() { return queue.empty(); } + void dispatch(Task& task); + + public: /** Start a serializer. * * @param notifyDispatch Called when work is pending and there is no * active dispatch thread. Must arrange for dispatch() to be called * in some thread other than the calling thread and return. - * By default the Serializer supplies its own dispatch thread. + * By default the Serailizer supplies its own dispatch thread. * * @param immediate Allow execute() to execute a task immediatly * in the current thread. */ - Serializer(bool immediate=true, Task notifyDispatch=Task()); - - ~Serializer(); + Serializer(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()) + : SerializerBase(immediate, notifyDispatch) {} /** * Task may be executed immediately in the calling thread if there @@ -68,33 +108,73 @@ class Serializer : private boost::noncopyable, private Runnable */ void execute(Task& task); + /** Execute pending tasks sequentially in calling thread. * Drains the task queue and returns, does not block for more tasks. * * @exception ShutdownException if the serializer is being destroyed. */ void dispatch(); - - private: - enum State { - IDLE, ///< No threads are active. - EXECUTING, ///< execute() is executing a single task. - DISPATCHING, ///< dispatch() is draining the queue. - SHUTDOWN ///< Serializer is being destroyed. }; - void dispatch(Task&); - void notifyWorker(); - void run(); - Monitor lock; +template <class Task> +void Serializer<Task>::execute(Task& task) { + bool needNotify = false; + { + Mutex::ScopedLock l(lock); + assert(state != SHUTDOWN); + if (immediate && state == IDLE) { + state = EXECUTING; + dispatch(task); + if (state != SHUTDOWN) { + assert(state == EXECUTING); + state = IDLE; + } + } + else + queue.push_back(task); + if (!queue.empty() && state == IDLE) { + state = DISPATCHING; + needNotify = true; + } + } + if (needNotify) + notifyDispatch(); // Not my function, call outside lock. +} + +template <class Task> +void Serializer<Task>::dispatch() { + Mutex::ScopedLock l(lock); + // TODO aconway 2007-07-16: This loop could be unbounded + // if other threads add work while we're in dispatch(Task&). + // If we need to bound it we could dispatch just the elements + // that were enqueued when dispatch() was first called - save + // begin() iterator and pop only up to that. + while (!queue.empty() && state != SHUTDOWN) { + assert(state == DISPATCHING); + dispatch(queue.front()); + queue.pop_front(); + } + if (state != SHUTDOWN) { + assert(state == DISPATCHING); + state = IDLE; + } +} + +template <class Task> +void Serializer<Task>::dispatch(Task& task) { + Mutex::ScopedUnlock u(lock); + // Preconditions: lock is held, state is EXECUTING or DISPATCHING + assert(state != IDLE); + assert(state != SHUTDOWN); + assert(state == EXECUTING || state == DISPATCHING); + // No exceptions allowed in task. + try { task(); } catch (...) { assert(0); } +} + + - State state; - bool immediate; - std::deque<Task> queue; - Thread worker; - Task notifyDispatch; -}; }} // namespace qpid::sys diff --git a/qpid/cpp/src/tests/Serializer.cpp b/qpid/cpp/src/tests/Serializer.cpp index d7345acf06..0135822275 100644 --- a/qpid/cpp/src/tests/Serializer.cpp +++ b/qpid/cpp/src/tests/Serializer.cpp @@ -38,6 +38,7 @@ using namespace qpid::sys; using namespace qpid::framing; using namespace std; +typedef Serializer<boost::function<void()> > BoostFunctionSerializer; /** Test for concurrent calls */ struct Tester { @@ -61,7 +62,7 @@ struct Tester { } }; -void execute(Serializer& s, Serializer::Task t) +void execute(BoostFunctionSerializer& s, boost::function<void()> t) { s.execute(t); } @@ -69,7 +70,7 @@ void execute(Serializer& s, Serializer::Task t) BOOST_AUTO_TEST_CASE(testSingleThread) { // Verify that we call in the same thread by default. Tester tester; - Serializer s; + BoostFunctionSerializer s; for (int i = 0; i < 100; ++i) execute(s, boost::bind(&Tester::test, &tester)); // All should be executed in this thread. @@ -83,7 +84,7 @@ BOOST_AUTO_TEST_CASE(testSingleThread) { BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) { // Verify that we call in different thread if immediate=false. Tester tester; - Serializer s(false); + BoostFunctionSerializer s(false); for (int i = 0; i < 100; ++i) execute(s, boost::bind(&Tester::test, &tester)); { @@ -99,13 +100,13 @@ BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) { } struct Caller : public Runnable, public Tester { - Caller(Serializer& s) : serializer(s) {} + Caller(BoostFunctionSerializer& s) : serializer(s) {} void run() { execute(serializer, boost::bind(&Tester::test, this)); } - Serializer& serializer; + BoostFunctionSerializer& serializer; }; BOOST_AUTO_TEST_CASE(testDispatchThread) { - Serializer s; + BoostFunctionSerializer s; Caller caller(s); Thread threads[100]; // Concurrent calls. @@ -121,7 +122,7 @@ BOOST_AUTO_TEST_CASE(testDispatchThread) { } -std::auto_ptr<Serializer> serializer; +std::auto_ptr<BoostFunctionSerializer> serializer; struct CallDispatch : public Runnable { void run() { @@ -136,7 +137,7 @@ void notifyDispatch() { // Use externally created threads. BOOST_AUTO_TEST_CASE(testExternalDispatch) { - serializer.reset(new Serializer(false, ¬ifyDispatch)); + serializer.reset(new BoostFunctionSerializer(false, ¬ifyDispatch)); Tester tester; for (int i = 0; i < 100; ++i) execute(*serializer, boost::bind(&Tester::test, &tester)); |