summaryrefslogtreecommitdiff
path: root/src/components/include/utils/threads/message_loop_thread.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/include/utils/threads/message_loop_thread.h')
-rw-r--r--src/components/include/utils/threads/message_loop_thread.h54
1 files changed, 35 insertions, 19 deletions
diff --git a/src/components/include/utils/threads/message_loop_thread.h b/src/components/include/utils/threads/message_loop_thread.h
index e051c4890..6f90df209 100644
--- a/src/components/include/utils/threads/message_loop_thread.h
+++ b/src/components/include/utils/threads/message_loop_thread.h
@@ -39,13 +39,16 @@
#include "utils/logger.h"
#include "utils/macro.h"
#include "utils/message_queue.h"
-#include "utils/threads/thread_manager.h"
-#include "utils/lock.h"
+#include "utils/threads/thread.h"
+#include "utils/shared_ptr.h"
namespace threads {
-/*
- * Class that handles a thread which sole purpose is to pump messages pushed
+using ::utils::MessageQueue;
+
+/**
+ * \class MessageLoopThread
+ * \brief Handles a thread which sole purpose is to pump messages pushed
* to it's queue. To handle messages someone, Handler must be implemented and
* passed to MessageLoopThread constructor.
*/
@@ -79,6 +82,10 @@ class MessageLoopThread {
// Places a message to the therad's queue. Thread-safe.
void PostMessage(const Message& message);
+
+ // Process already posted messages and stop thread processing. Thread-safe.
+ void Shutdown();
+
private:
/*
* Implementation of ThreadDelegate that actually pumps the queue and is
@@ -90,19 +97,20 @@ class MessageLoopThread {
// threads::ThreadDelegate overrides
virtual void threadMain() OVERRIDE;
- virtual bool exitThreadMain() OVERRIDE;
+ virtual void exitThreadMain() OVERRIDE;
+
private:
// Handle all messages that are in the queue until it is empty
void DrainQue();
- private:
// Handler that processes messages
Handler& handler_;
// Message queue that is actually owned by MessageLoopThread
MessageQueue<Message, Queue>& message_queue_;
- sync_primitives::Lock active_lock;
};
+
private:
MessageQueue<Message, Queue> message_queue_;
+ LoopThreadDelegate* thread_delegate_;
threads::Thread* thread_;
};
@@ -112,8 +120,10 @@ template<class Q>
MessageLoopThread<Q>::MessageLoopThread(const std::string& name,
Handler* handler,
const ThreadOptions& thread_opts)
- : thread_(threads::CreateThread(name.c_str(), new LoopThreadDelegate(&message_queue_, handler))) {
- bool started = thread_->startWithOptions(thread_opts);
+ : thread_delegate_(new LoopThreadDelegate(&message_queue_, handler)),
+ thread_(threads::CreateThread(name.c_str(),
+ thread_delegate_)) {
+ const bool started = thread_->start(thread_opts);
if (!started) {
CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
LOG4CXX_ERROR(logger_, "Failed to start thread " << name);
@@ -122,7 +132,10 @@ MessageLoopThread<Q>::MessageLoopThread(const std::string& name,
template<class Q>
MessageLoopThread<Q>::~MessageLoopThread() {
- thread_->stop();
+ Shutdown();
+ thread_->join();
+ delete thread_delegate_;
+ threads::DeleteThread(thread_);
}
template <class Q>
@@ -130,6 +143,11 @@ void MessageLoopThread<Q>::PostMessage(const Message& message) {
message_queue_.push(message);
}
+template <class Q>
+void MessageLoopThread<Q>::Shutdown() {
+ thread_->stop();
+}
+
//////////
template<class Q>
MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate(
@@ -142,8 +160,9 @@ MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate(
template<class Q>
void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() {
- sync_primitives::AutoLock auto_lock(active_lock);
- while(!message_queue_.IsShuttingDown()){
+ CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
+ LOG4CXX_AUTO_TRACE(logger_);
+ while (!message_queue_.IsShuttingDown()) {
DrainQue();
message_queue_.wait();
}
@@ -152,18 +171,15 @@ void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() {
}
template<class Q>
-bool MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() {
+void MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() {
+ CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
+ LOG4CXX_AUTO_TRACE(logger_);
message_queue_.Shutdown();
- {
- sync_primitives::AutoLock auto_lock(active_lock);
- // Prevent canceling thread until queue is drained
- }
- return true;
}
template<class Q>
void MessageLoopThread<Q>::LoopThreadDelegate::DrainQue() {
- while(!message_queue_.empty()) {
+ while (!message_queue_.empty()) {
handler_.Handle(message_queue_.pop());
}
}