summaryrefslogtreecommitdiff
path: root/src/components/include/utils/threads
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/include/utils/threads')
-rw-r--r--src/components/include/utils/threads/CMakeLists.txt5
-rw-r--r--src/components/include/utils/threads/message_loop_thread.h54
-rw-r--r--src/components/include/utils/threads/thread.h146
-rw-r--r--src/components/include/utils/threads/thread_delegate.h64
-rw-r--r--src/components/include/utils/threads/thread_options.h4
5 files changed, 164 insertions, 109 deletions
diff --git a/src/components/include/utils/threads/CMakeLists.txt b/src/components/include/utils/threads/CMakeLists.txt
new file mode 100644
index 000000000..f97039c21
--- /dev/null
+++ b/src/components/include/utils/threads/CMakeLists.txt
@@ -0,0 +1,5 @@
+set(UtilsIncludeDir ${COMPONENTS_DIR/utils/include)
+
+include_directories (
+ ${UtilsIncludeDir}
+) \ No newline at end of file
diff --git a/src/components/include/utils/threads/message_loop_thread.h b/src/components/include/utils/threads/message_loop_thread.h
index e051c4890..6f90df209 100644
--- a/src/components/include/utils/threads/message_loop_thread.h
+++ b/src/components/include/utils/threads/message_loop_thread.h
@@ -39,13 +39,16 @@
#include "utils/logger.h"
#include "utils/macro.h"
#include "utils/message_queue.h"
-#include "utils/threads/thread_manager.h"
-#include "utils/lock.h"
+#include "utils/threads/thread.h"
+#include "utils/shared_ptr.h"
namespace threads {
-/*
- * Class that handles a thread which sole purpose is to pump messages pushed
+using ::utils::MessageQueue;
+
+/**
+ * \class MessageLoopThread
+ * \brief Handles a thread which sole purpose is to pump messages pushed
* to it's queue. To handle messages someone, Handler must be implemented and
* passed to MessageLoopThread constructor.
*/
@@ -79,6 +82,10 @@ class MessageLoopThread {
// Places a message to the therad's queue. Thread-safe.
void PostMessage(const Message& message);
+
+ // Process already posted messages and stop thread processing. Thread-safe.
+ void Shutdown();
+
private:
/*
* Implementation of ThreadDelegate that actually pumps the queue and is
@@ -90,19 +97,20 @@ class MessageLoopThread {
// threads::ThreadDelegate overrides
virtual void threadMain() OVERRIDE;
- virtual bool exitThreadMain() OVERRIDE;
+ virtual void exitThreadMain() OVERRIDE;
+
private:
// Handle all messages that are in the queue until it is empty
void DrainQue();
- private:
// Handler that processes messages
Handler& handler_;
// Message queue that is actually owned by MessageLoopThread
MessageQueue<Message, Queue>& message_queue_;
- sync_primitives::Lock active_lock;
};
+
private:
MessageQueue<Message, Queue> message_queue_;
+ LoopThreadDelegate* thread_delegate_;
threads::Thread* thread_;
};
@@ -112,8 +120,10 @@ template<class Q>
MessageLoopThread<Q>::MessageLoopThread(const std::string& name,
Handler* handler,
const ThreadOptions& thread_opts)
- : thread_(threads::CreateThread(name.c_str(), new LoopThreadDelegate(&message_queue_, handler))) {
- bool started = thread_->startWithOptions(thread_opts);
+ : thread_delegate_(new LoopThreadDelegate(&message_queue_, handler)),
+ thread_(threads::CreateThread(name.c_str(),
+ thread_delegate_)) {
+ const bool started = thread_->start(thread_opts);
if (!started) {
CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
LOG4CXX_ERROR(logger_, "Failed to start thread " << name);
@@ -122,7 +132,10 @@ MessageLoopThread<Q>::MessageLoopThread(const std::string& name,
template<class Q>
MessageLoopThread<Q>::~MessageLoopThread() {
- thread_->stop();
+ Shutdown();
+ thread_->join();
+ delete thread_delegate_;
+ threads::DeleteThread(thread_);
}
template <class Q>
@@ -130,6 +143,11 @@ void MessageLoopThread<Q>::PostMessage(const Message& message) {
message_queue_.push(message);
}
+template <class Q>
+void MessageLoopThread<Q>::Shutdown() {
+ thread_->stop();
+}
+
//////////
template<class Q>
MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate(
@@ -142,8 +160,9 @@ MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate(
template<class Q>
void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() {
- sync_primitives::AutoLock auto_lock(active_lock);
- while(!message_queue_.IsShuttingDown()){
+ CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
+ LOG4CXX_AUTO_TRACE(logger_);
+ while (!message_queue_.IsShuttingDown()) {
DrainQue();
message_queue_.wait();
}
@@ -152,18 +171,15 @@ void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() {
}
template<class Q>
-bool MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() {
+void MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() {
+ CREATE_LOGGERPTR_LOCAL(logger_, "Utils")
+ LOG4CXX_AUTO_TRACE(logger_);
message_queue_.Shutdown();
- {
- sync_primitives::AutoLock auto_lock(active_lock);
- // Prevent canceling thread until queue is drained
- }
- return true;
}
template<class Q>
void MessageLoopThread<Q>::LoopThreadDelegate::DrainQue() {
- while(!message_queue_.empty()) {
+ while (!message_queue_.empty()) {
handler_.Handle(message_queue_.pop());
}
}
diff --git a/src/components/include/utils/threads/thread.h b/src/components/include/utils/threads/thread.h
index 3b81cf345..fd2b5e9fd 100644
--- a/src/components/include/utils/threads/thread.h
+++ b/src/components/include/utils/threads/thread.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014, Ford Motor Company
+ * Copyright (c) 2015, Ford Motor Company
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,19 +43,19 @@
#include "utils/macro.h"
#include "utils/threads/thread_delegate.h"
#include "utils/threads/thread_options.h"
+#include "utils/conditional_variable.h"
+#include "utils/lock.h"
namespace threads {
-namespace impl {
#if defined(OS_POSIX)
typedef pthread_t PlatformThreadHandle;
#else
#error Please implement thread for your OS
#endif
-}
/**
- * Non platform specific thread abstraction that establishes a
+ * @brief Non platform specific thread abstraction that establishes a
* threads::ThreadDelegate on a new thread.
*
* ThreadDelegate example:
@@ -75,60 +75,72 @@ typedef pthread_t PlatformThreadHandle;
* thread.join();
* printf("ok!\n");
*/
+
class Thread;
+void enqueue_to_join(Thread* thread);
+
Thread* CreateThread(const char* name, ThreadDelegate* delegate);
-void DeleteThread(Thread*);
+void DeleteThread(Thread* thread);
class Thread {
- friend Thread* CreateThread(const char*, ThreadDelegate*);
- friend void DeleteThread(Thread*);
- public:
- /**
- * Class that represents unique in-process thread identifier
- * due to restriction of pthread API it only allows checks
- * for equality to different thread id and no ordering.
- *
- * ostream<< operator is provided for this class which
- * outputs thread name associated to an identifier.
- */
- class Id {
- public:
- explicit Id(const impl::PlatformThreadHandle& id): id_(id) {}
- bool operator==(const Id& that) const;
- impl::PlatformThreadHandle Handle() const { return id_; }
- private:
- impl::PlatformThreadHandle id_;
- friend class Thread;
- };
-
- // Get unique ID of currently executing thread
- static Id CurrentId();
-
- // Get name associated with thread identified by thread_id
- static std::string NameFromId(const Id& thread_id);
-
- // Give thread thread_id a name, helpful for debugging
- static void SetNameForId(const Id& thread_id, const std::string& name);
+ private:
+ const std::string name_;
+ // Should be locked to protect delegate_ value
+ sync_primitives::Lock delegate_lock_;
+ ThreadDelegate* delegate_;
+ PlatformThreadHandle handle_;
+ ThreadOptions thread_options_;
+ // Should be locked to protect isThreadRunning_ and thread_created_ values
+ sync_primitives::Lock state_lock_;
+ volatile unsigned int isThreadRunning_;
+ volatile bool stopped_;
+ volatile bool finalized_;
+ bool thread_created_;
+ // Signalled when Thread::start() is called
+ sync_primitives::ConditionalVariable run_cond_;
+ public:
/**
- * Starts the thread.
+ * @brief Starts the thread.
* @return true if the thread was successfully started.
*/
bool start();
- ThreadDelegate* delegate() const;
-
/**
- * Starts the thread. Behaves exactly like Start in addition to
+ * @brief Starts the thread. Behaves exactly like \ref start() in addition to
* allow to override the default options.
- * @param options - thread options. Look for 'threads/thread_options.h'
+ * @param options Thread options. Look for 'threads/thread_options.h'
* for details.
* @return true if the thread was successfully started.
*/
- bool startWithOptions(const ThreadOptions& options);
+ bool start(const ThreadOptions& options);
+
+ sync_primitives::Lock& delegate_lock() {
+ return delegate_lock_;
+ }
+
+ ThreadDelegate *delegate() const {
+ return delegate_;
+ }
+
+ void set_delegate(ThreadDelegate *delegate) {
+ DCHECK(!isThreadRunning_);
+ delegate_ = delegate;
+ }
+
+ friend Thread* CreateThread(const char* name, ThreadDelegate* delegate);
+ friend void DeleteThread(Thread* thread);
+
+ public:
+ // Get unique ID of currently executing thread
+ static PlatformThreadHandle CurrentId();
+
+ // Give thread thread_id a name, helpful for debugging
+ static void SetNameForId(const PlatformThreadHandle& thread_id,
+ std::string name);
/**
- * Signals the thread to exit and returns once the thread has exited.
+ * @brief Signals the thread to exit and returns once the thread has exited.
* After this method returns, the Thread object is completely reset and may
* be used as if it were newly constructed (i.e., Start may be called again).
*
@@ -137,16 +149,18 @@ class Thread {
*/
void stop();
+ void join();
+
/**
- * Get thread name.
+ * @brief Get thread name.
* @return thread name
*/
- const std::string& thread_name() {
+ const std::string& name() {
return name_;
}
/**
- * Returns true if the thread has been started, and not yet stopped.
+ * @brief Returns true if the thread has been started, and not yet stopped.
* When a thread is running, the thread_id_ is non-zero.
* @return true if the thread has been started, and not yet stopped.
*/
@@ -154,12 +168,10 @@ class Thread {
return isThreadRunning_;
}
- void set_running(bool running) {
- isThreadRunning_ = running;
- }
+ void set_running(bool running);
/**
- * Is thread joinable?
+ * @brief Is thread joinable?
* @return - Returns true if the thread is joinable.
*/
bool is_joinable() const {
@@ -167,7 +179,7 @@ class Thread {
}
/**
- * Thread stack size
+ * @brief Thread stack size
* @return thread stack size
*/
size_t stack_size() const {
@@ -175,23 +187,15 @@ class Thread {
}
/**
- * The native thread handle.
+ * @brief The native thread handle.
* @return thread handle.
*/
- impl::PlatformThreadHandle thread_handle() const {
- return thread_handle_;
- }
-
- /**
- * Thread id.
- * @return return thread id.
- */
- Id thread_id() const {
- return Id(thread_handle());
+ PlatformThreadHandle thread_handle() const {
+ return handle_;
}
/**
- * Thread options.
+ * @brief Thread options.
* @return thread options.
*/
const ThreadOptions& thread_options() const {
@@ -199,16 +203,12 @@ class Thread {
}
/**
- * Minimum size of thread stack for specific platform.
+ * @brief Minimum size of thread stack for specific platform.
*/
static size_t kMinStackSize;
protected:
- const std::string name_;
- ThreadDelegate* delegate_;
- impl::PlatformThreadHandle thread_handle_;
- ThreadOptions thread_options_;
- volatile unsigned int isThreadRunning_;
+ sync_primitives::ConditionalVariable state_cond_;
private:
/**
@@ -216,19 +216,17 @@ class Thread {
* @param name - display string to identify the thread.
* @param delegate - thread procedure delegate. Look for
* 'threads/thread_delegate.h' for details.
- * NOTE: delegate will be deleted by destructor.
+ * LifeCycle thread , otherwise it will be joined in stop method
+ * NOTE: delegate will be deleted after thread will be joined
* This constructor made private to prevent
* Thread object to be created on stack
*/
Thread(const char* name, ThreadDelegate* delegate);
-
+ virtual ~Thread();
+ static void* threadFunc(void* arg);
+ static void cleanup(void* arg);
DISALLOW_COPY_AND_ASSIGN(Thread);
- virtual ~Thread() { }
};
-inline bool operator!= (const Thread::Id& left, const Thread::Id& right) {
- return !(left == right);
-}
-std::ostream& operator<<(std::ostream& os, const Thread::Id& thread_id);
} // namespace threads
#endif // SRC_COMPONENTS_INCLUDE_UTILS_THREADS_THREAD_H_
diff --git a/src/components/include/utils/threads/thread_delegate.h b/src/components/include/utils/threads/thread_delegate.h
index 47e68f1e8..66ad30241 100644
--- a/src/components/include/utils/threads/thread_delegate.h
+++ b/src/components/include/utils/threads/thread_delegate.h
@@ -35,30 +35,66 @@
#include <pthread.h>
+#include "utils/lock.h"
+
namespace threads {
+enum ThreadState {
+ kInit = 0,
+ kStarted = 1,
+ kStopReq = 2
+};
+
+class Thread;
+
/**
* Thread procedure interface.
* Look for "threads/thread.h" for example
*/
class ThreadDelegate {
- public:
+ public:
+ ThreadDelegate()
+ : state_(kInit),
+ thread_(NULL) {
+ }
+ /**
+ * \brief Thread procedure.
+ */
+ virtual void threadMain() = 0;
+
+ /**
+ * Should be called to free all resources allocated in threadMain
+ * and exiting threadMain
+ * This function should be blocking and return only when threadMain() will be
+ * finished in other case segmantation failes are possible
+ */
+ virtual void exitThreadMain();
- /**
- * Thread procedure.
- */
- virtual void threadMain() = 0;
+ virtual ~ThreadDelegate();
- /**
- * Should be called to free all resources allocated in threadMain
- * and exiting threadMain
- * This function should be blocking and return only when threadMain() will be
- * finished in other case segmantation failes are possible
- */
- virtual bool exitThreadMain() {
- return false;
+ Thread* thread() const {
+ return thread_;
+ }
+
+ void set_thread(Thread *thread);
+
+ bool ImproveState(unsigned int to) {
+ state_lock_.Lock();
+ if ((state_ + 1 == to) || (to == kInit && state_ == kStopReq)) {
+ state_ = to;
}
- virtual ~ThreadDelegate() { }
+ state_lock_.Unlock();
+ return state_ == to;
+ }
+
+ unsigned int state() const {
+ return state_;
+ }
+
+ private:
+ volatile unsigned int state_;
+ sync_primitives::SpinMutex state_lock_;
+ Thread* thread_;
};
} // namespace threads
diff --git a/src/components/include/utils/threads/thread_options.h b/src/components/include/utils/threads/thread_options.h
index 217f0815a..797ee0693 100644
--- a/src/components/include/utils/threads/thread_options.h
+++ b/src/components/include/utils/threads/thread_options.h
@@ -38,7 +38,7 @@
namespace threads {
/**
- * @breif Startup options for thread.
+ * @brief Startup options for thread.
* Look for "threads/thread.h" for example
*/
class ThreadOptions {
@@ -74,7 +74,7 @@ class ThreadOptions {
* @param options - new options.
* @return new options.
*/
- ThreadOptions& operator=(const ThreadOptions& options ) {
+ ThreadOptions& operator=(const ThreadOptions& options) {
stack_size_ = options.stack_size();
is_joinable_ = options.is_joinable();
return *this;