diff options
Diffstat (limited to 'src/components/include/utils/threads')
5 files changed, 164 insertions, 109 deletions
diff --git a/src/components/include/utils/threads/CMakeLists.txt b/src/components/include/utils/threads/CMakeLists.txt new file mode 100644 index 000000000..f97039c21 --- /dev/null +++ b/src/components/include/utils/threads/CMakeLists.txt @@ -0,0 +1,5 @@ +set(UtilsIncludeDir ${COMPONENTS_DIR/utils/include) + +include_directories ( + ${UtilsIncludeDir} +)
\ No newline at end of file diff --git a/src/components/include/utils/threads/message_loop_thread.h b/src/components/include/utils/threads/message_loop_thread.h index e051c4890..6f90df209 100644 --- a/src/components/include/utils/threads/message_loop_thread.h +++ b/src/components/include/utils/threads/message_loop_thread.h @@ -39,13 +39,16 @@ #include "utils/logger.h" #include "utils/macro.h" #include "utils/message_queue.h" -#include "utils/threads/thread_manager.h" -#include "utils/lock.h" +#include "utils/threads/thread.h" +#include "utils/shared_ptr.h" namespace threads { -/* - * Class that handles a thread which sole purpose is to pump messages pushed +using ::utils::MessageQueue; + +/** + * \class MessageLoopThread + * \brief Handles a thread which sole purpose is to pump messages pushed * to it's queue. To handle messages someone, Handler must be implemented and * passed to MessageLoopThread constructor. */ @@ -79,6 +82,10 @@ class MessageLoopThread { // Places a message to the therad's queue. Thread-safe. void PostMessage(const Message& message); + + // Process already posted messages and stop thread processing. Thread-safe. + void Shutdown(); + private: /* * Implementation of ThreadDelegate that actually pumps the queue and is @@ -90,19 +97,20 @@ class MessageLoopThread { // threads::ThreadDelegate overrides virtual void threadMain() OVERRIDE; - virtual bool exitThreadMain() OVERRIDE; + virtual void exitThreadMain() OVERRIDE; + private: // Handle all messages that are in the queue until it is empty void DrainQue(); - private: // Handler that processes messages Handler& handler_; // Message queue that is actually owned by MessageLoopThread MessageQueue<Message, Queue>& message_queue_; - sync_primitives::Lock active_lock; }; + private: MessageQueue<Message, Queue> message_queue_; + LoopThreadDelegate* thread_delegate_; threads::Thread* thread_; }; @@ -112,8 +120,10 @@ template<class Q> MessageLoopThread<Q>::MessageLoopThread(const std::string& name, Handler* handler, const ThreadOptions& thread_opts) - : thread_(threads::CreateThread(name.c_str(), new LoopThreadDelegate(&message_queue_, handler))) { - bool started = thread_->startWithOptions(thread_opts); + : thread_delegate_(new LoopThreadDelegate(&message_queue_, handler)), + thread_(threads::CreateThread(name.c_str(), + thread_delegate_)) { + const bool started = thread_->start(thread_opts); if (!started) { CREATE_LOGGERPTR_LOCAL(logger_, "Utils") LOG4CXX_ERROR(logger_, "Failed to start thread " << name); @@ -122,7 +132,10 @@ MessageLoopThread<Q>::MessageLoopThread(const std::string& name, template<class Q> MessageLoopThread<Q>::~MessageLoopThread() { - thread_->stop(); + Shutdown(); + thread_->join(); + delete thread_delegate_; + threads::DeleteThread(thread_); } template <class Q> @@ -130,6 +143,11 @@ void MessageLoopThread<Q>::PostMessage(const Message& message) { message_queue_.push(message); } +template <class Q> +void MessageLoopThread<Q>::Shutdown() { + thread_->stop(); +} + ////////// template<class Q> MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate( @@ -142,8 +160,9 @@ MessageLoopThread<Q>::LoopThreadDelegate::LoopThreadDelegate( template<class Q> void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() { - sync_primitives::AutoLock auto_lock(active_lock); - while(!message_queue_.IsShuttingDown()){ + CREATE_LOGGERPTR_LOCAL(logger_, "Utils") + LOG4CXX_AUTO_TRACE(logger_); + while (!message_queue_.IsShuttingDown()) { DrainQue(); message_queue_.wait(); } @@ -152,18 +171,15 @@ void MessageLoopThread<Q>::LoopThreadDelegate::threadMain() { } template<class Q> -bool MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() { +void MessageLoopThread<Q>::LoopThreadDelegate::exitThreadMain() { + CREATE_LOGGERPTR_LOCAL(logger_, "Utils") + LOG4CXX_AUTO_TRACE(logger_); message_queue_.Shutdown(); - { - sync_primitives::AutoLock auto_lock(active_lock); - // Prevent canceling thread until queue is drained - } - return true; } template<class Q> void MessageLoopThread<Q>::LoopThreadDelegate::DrainQue() { - while(!message_queue_.empty()) { + while (!message_queue_.empty()) { handler_.Handle(message_queue_.pop()); } } diff --git a/src/components/include/utils/threads/thread.h b/src/components/include/utils/threads/thread.h index 3b81cf345..fd2b5e9fd 100644 --- a/src/components/include/utils/threads/thread.h +++ b/src/components/include/utils/threads/thread.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014, Ford Motor Company + * Copyright (c) 2015, Ford Motor Company * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,19 +43,19 @@ #include "utils/macro.h" #include "utils/threads/thread_delegate.h" #include "utils/threads/thread_options.h" +#include "utils/conditional_variable.h" +#include "utils/lock.h" namespace threads { -namespace impl { #if defined(OS_POSIX) typedef pthread_t PlatformThreadHandle; #else #error Please implement thread for your OS #endif -} /** - * Non platform specific thread abstraction that establishes a + * @brief Non platform specific thread abstraction that establishes a * threads::ThreadDelegate on a new thread. * * ThreadDelegate example: @@ -75,60 +75,72 @@ typedef pthread_t PlatformThreadHandle; * thread.join(); * printf("ok!\n"); */ + class Thread; +void enqueue_to_join(Thread* thread); + Thread* CreateThread(const char* name, ThreadDelegate* delegate); -void DeleteThread(Thread*); +void DeleteThread(Thread* thread); class Thread { - friend Thread* CreateThread(const char*, ThreadDelegate*); - friend void DeleteThread(Thread*); - public: - /** - * Class that represents unique in-process thread identifier - * due to restriction of pthread API it only allows checks - * for equality to different thread id and no ordering. - * - * ostream<< operator is provided for this class which - * outputs thread name associated to an identifier. - */ - class Id { - public: - explicit Id(const impl::PlatformThreadHandle& id): id_(id) {} - bool operator==(const Id& that) const; - impl::PlatformThreadHandle Handle() const { return id_; } - private: - impl::PlatformThreadHandle id_; - friend class Thread; - }; - - // Get unique ID of currently executing thread - static Id CurrentId(); - - // Get name associated with thread identified by thread_id - static std::string NameFromId(const Id& thread_id); - - // Give thread thread_id a name, helpful for debugging - static void SetNameForId(const Id& thread_id, const std::string& name); + private: + const std::string name_; + // Should be locked to protect delegate_ value + sync_primitives::Lock delegate_lock_; + ThreadDelegate* delegate_; + PlatformThreadHandle handle_; + ThreadOptions thread_options_; + // Should be locked to protect isThreadRunning_ and thread_created_ values + sync_primitives::Lock state_lock_; + volatile unsigned int isThreadRunning_; + volatile bool stopped_; + volatile bool finalized_; + bool thread_created_; + // Signalled when Thread::start() is called + sync_primitives::ConditionalVariable run_cond_; + public: /** - * Starts the thread. + * @brief Starts the thread. * @return true if the thread was successfully started. */ bool start(); - ThreadDelegate* delegate() const; - /** - * Starts the thread. Behaves exactly like Start in addition to + * @brief Starts the thread. Behaves exactly like \ref start() in addition to * allow to override the default options. - * @param options - thread options. Look for 'threads/thread_options.h' + * @param options Thread options. Look for 'threads/thread_options.h' * for details. * @return true if the thread was successfully started. */ - bool startWithOptions(const ThreadOptions& options); + bool start(const ThreadOptions& options); + + sync_primitives::Lock& delegate_lock() { + return delegate_lock_; + } + + ThreadDelegate *delegate() const { + return delegate_; + } + + void set_delegate(ThreadDelegate *delegate) { + DCHECK(!isThreadRunning_); + delegate_ = delegate; + } + + friend Thread* CreateThread(const char* name, ThreadDelegate* delegate); + friend void DeleteThread(Thread* thread); + + public: + // Get unique ID of currently executing thread + static PlatformThreadHandle CurrentId(); + + // Give thread thread_id a name, helpful for debugging + static void SetNameForId(const PlatformThreadHandle& thread_id, + std::string name); /** - * Signals the thread to exit and returns once the thread has exited. + * @brief Signals the thread to exit and returns once the thread has exited. * After this method returns, the Thread object is completely reset and may * be used as if it were newly constructed (i.e., Start may be called again). * @@ -137,16 +149,18 @@ class Thread { */ void stop(); + void join(); + /** - * Get thread name. + * @brief Get thread name. * @return thread name */ - const std::string& thread_name() { + const std::string& name() { return name_; } /** - * Returns true if the thread has been started, and not yet stopped. + * @brief Returns true if the thread has been started, and not yet stopped. * When a thread is running, the thread_id_ is non-zero. * @return true if the thread has been started, and not yet stopped. */ @@ -154,12 +168,10 @@ class Thread { return isThreadRunning_; } - void set_running(bool running) { - isThreadRunning_ = running; - } + void set_running(bool running); /** - * Is thread joinable? + * @brief Is thread joinable? * @return - Returns true if the thread is joinable. */ bool is_joinable() const { @@ -167,7 +179,7 @@ class Thread { } /** - * Thread stack size + * @brief Thread stack size * @return thread stack size */ size_t stack_size() const { @@ -175,23 +187,15 @@ class Thread { } /** - * The native thread handle. + * @brief The native thread handle. * @return thread handle. */ - impl::PlatformThreadHandle thread_handle() const { - return thread_handle_; - } - - /** - * Thread id. - * @return return thread id. - */ - Id thread_id() const { - return Id(thread_handle()); + PlatformThreadHandle thread_handle() const { + return handle_; } /** - * Thread options. + * @brief Thread options. * @return thread options. */ const ThreadOptions& thread_options() const { @@ -199,16 +203,12 @@ class Thread { } /** - * Minimum size of thread stack for specific platform. + * @brief Minimum size of thread stack for specific platform. */ static size_t kMinStackSize; protected: - const std::string name_; - ThreadDelegate* delegate_; - impl::PlatformThreadHandle thread_handle_; - ThreadOptions thread_options_; - volatile unsigned int isThreadRunning_; + sync_primitives::ConditionalVariable state_cond_; private: /** @@ -216,19 +216,17 @@ class Thread { * @param name - display string to identify the thread. * @param delegate - thread procedure delegate. Look for * 'threads/thread_delegate.h' for details. - * NOTE: delegate will be deleted by destructor. + * LifeCycle thread , otherwise it will be joined in stop method + * NOTE: delegate will be deleted after thread will be joined * This constructor made private to prevent * Thread object to be created on stack */ Thread(const char* name, ThreadDelegate* delegate); - + virtual ~Thread(); + static void* threadFunc(void* arg); + static void cleanup(void* arg); DISALLOW_COPY_AND_ASSIGN(Thread); - virtual ~Thread() { } }; -inline bool operator!= (const Thread::Id& left, const Thread::Id& right) { - return !(left == right); -} -std::ostream& operator<<(std::ostream& os, const Thread::Id& thread_id); } // namespace threads #endif // SRC_COMPONENTS_INCLUDE_UTILS_THREADS_THREAD_H_ diff --git a/src/components/include/utils/threads/thread_delegate.h b/src/components/include/utils/threads/thread_delegate.h index 47e68f1e8..66ad30241 100644 --- a/src/components/include/utils/threads/thread_delegate.h +++ b/src/components/include/utils/threads/thread_delegate.h @@ -35,30 +35,66 @@ #include <pthread.h> +#include "utils/lock.h" + namespace threads { +enum ThreadState { + kInit = 0, + kStarted = 1, + kStopReq = 2 +}; + +class Thread; + /** * Thread procedure interface. * Look for "threads/thread.h" for example */ class ThreadDelegate { - public: + public: + ThreadDelegate() + : state_(kInit), + thread_(NULL) { + } + /** + * \brief Thread procedure. + */ + virtual void threadMain() = 0; + + /** + * Should be called to free all resources allocated in threadMain + * and exiting threadMain + * This function should be blocking and return only when threadMain() will be + * finished in other case segmantation failes are possible + */ + virtual void exitThreadMain(); - /** - * Thread procedure. - */ - virtual void threadMain() = 0; + virtual ~ThreadDelegate(); - /** - * Should be called to free all resources allocated in threadMain - * and exiting threadMain - * This function should be blocking and return only when threadMain() will be - * finished in other case segmantation failes are possible - */ - virtual bool exitThreadMain() { - return false; + Thread* thread() const { + return thread_; + } + + void set_thread(Thread *thread); + + bool ImproveState(unsigned int to) { + state_lock_.Lock(); + if ((state_ + 1 == to) || (to == kInit && state_ == kStopReq)) { + state_ = to; } - virtual ~ThreadDelegate() { } + state_lock_.Unlock(); + return state_ == to; + } + + unsigned int state() const { + return state_; + } + + private: + volatile unsigned int state_; + sync_primitives::SpinMutex state_lock_; + Thread* thread_; }; } // namespace threads diff --git a/src/components/include/utils/threads/thread_options.h b/src/components/include/utils/threads/thread_options.h index 217f0815a..797ee0693 100644 --- a/src/components/include/utils/threads/thread_options.h +++ b/src/components/include/utils/threads/thread_options.h @@ -38,7 +38,7 @@ namespace threads { /** - * @breif Startup options for thread. + * @brief Startup options for thread. * Look for "threads/thread.h" for example */ class ThreadOptions { @@ -74,7 +74,7 @@ class ThreadOptions { * @param options - new options. * @return new options. */ - ThreadOptions& operator=(const ThreadOptions& options ) { + ThreadOptions& operator=(const ThreadOptions& options) { stack_size_ = options.stack_size(); is_joinable_ = options.is_joinable(); return *this; |
