summaryrefslogtreecommitdiff
path: root/src/components/include/utils/threads/message_loop_thread.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/include/utils/threads/message_loop_thread.h')
-rw-r--r--src/components/include/utils/threads/message_loop_thread.h52
1 files changed, 19 insertions, 33 deletions
diff --git a/src/components/include/utils/threads/message_loop_thread.h b/src/components/include/utils/threads/message_loop_thread.h
index c01ebfd067..e051c48904 100644
--- a/src/components/include/utils/threads/message_loop_thread.h
+++ b/src/components/include/utils/threads/message_loop_thread.h
@@ -39,14 +39,13 @@
#include "utils/logger.h"
#include "utils/macro.h"
#include "utils/message_queue.h"
-#include "utils/threads/thread.h"
-#include "utils/shared_ptr.h"
+#include "utils/threads/thread_manager.h"
+#include "utils/lock.h"
namespace threads {
-/**
- * \class MessageLoopThread
- * \brief Handles a thread which sole purpose is to pump messages pushed
+/*
+ * Class that handles a thread which sole purpose is to pump messages pushed
* to it's queue. To handle messages someone, Handler must be implemented and
* passed to MessageLoopThread constructor.
*/
@@ -80,10 +79,6 @@ class MessageLoopThread {
// Places a message to the therad's queue. Thread-safe.
void PostMessage(const Message& message);
-
- // Process already posted messages and stop thread processing. Thread-safe.
- void Shutdown();
-
private:
/*
* Implementation of ThreadDelegate that actually pumps the queue and is
@@ -95,20 +90,19 @@ class MessageLoopThread {
// threads::ThreadDelegate overrides
virtual void threadMain() OVERRIDE;
- virtual void exitThreadMain() OVERRIDE;
-
+ virtual bool exitThreadMain() OVERRIDE;
private:
// Handle all messages that are in the queue until it is empty
void DrainQue();
+ private:
// Handler that processes messages
Handler& handler_;
// Message queue that is actually owned by MessageLoopThread
MessageQueue<Message, Queue>& message_queue_;
+ sync_primitives::Lock active_lock;
};
-
private:
MessageQueue<Message, Queue> message_queue_;
- LoopThreadDelegate* thread_delegate_;
threads::Thread* thread_;
};
@@ -118,10 +112,8 @@ template<class Q>
MessageLoopThread<Q>::MessageLoopThread(const std::string& name,
Handler* handler,
const ThreadOptions& thread_opts)
- : thread_delegate_(new LoopThreadDelegate(&message_queue_, handler)),
- thread_(threads::CreateThread(name.c_str(),
- thread_delegate_)) {
- const bool started = thread_->start(thread_opts);
+ : thread_(threads::CreateThread(name.c_str(), new LoopThreadDelegate(&message_queue_, handler))) {
+ bool started = thread_->startWithOptions(thread_opts);
if (!started) {
CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
LOG4CXX_ERROR(logger_, "Failed to start thread " << name);
@@ -130,10 +122,7 @@ MessageLoopThread<Q>::MessageLoopThread(const std::string& name,
template<class Q>
MessageLoopThread<Q>::~MessageLoopThread() {
- Shutdown();
- thread_->join();
- delete thread_delegate_;
- threads::DeleteThread(thread_);
+ thread_->stop();
}
template <class Q>
@@ -141,11 +130,6 @@ void MessageLoopThread<Q>::PostMessage(const Message& message) {
message_queue_.push(message);
}
-template <class Q>
-void MessageLoopThread<Q>::Shutdown() {
- thread_->stop();
-}
-
//////////
template<class Q>
MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate(
@@ -158,9 +142,8 @@ MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate(
template<class Q>
void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() {
- CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
- LOG4CXX_AUTO_TRACE(logger_);
- while (!message_queue_.IsShuttingDown()) {
+ sync_primitives::AutoLock auto_lock(active_lock);
+ while(!message_queue_.IsShuttingDown()){
DrainQue();
message_queue_.wait();
}
@@ -169,15 +152,18 @@ void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() {
}
template<class Q>
-void MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() {
- CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
- LOG4CXX_AUTO_TRACE(logger_);
+bool MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() {
message_queue_.Shutdown();
+ {
+ sync_primitives::AutoLock auto_lock(active_lock);
+ // Prevent canceling thread until queue is drained
+ }
+ return true;
}
template<class Q>
void MessageLoopThread<Q>::LoopThreadDelegate::DrainQue() {
- while (!message_queue_.empty()) {
+ while(!message_queue_.empty()) {
handler_.Handle(message_queue_.pop());
}
}