diff options
33 files changed, 861 insertions, 514 deletions
diff --git a/src/appMain/low_voltage_signals_handler.cc b/src/appMain/low_voltage_signals_handler.cc index bb7bbfbac7..8a3d0dfb9b 100644 --- a/src/appMain/low_voltage_signals_handler.cc +++ b/src/appMain/low_voltage_signals_handler.cc @@ -60,7 +60,7 @@ LowVoltageSignalsHandler::LowVoltageSignalsHandler( , cpid_(-1) { sigemptyset(&lv_mask_); sigaddset(&lv_mask_, SIGLOWVOLTAGE_); - signals_handler_thread_->start(); + signals_handler_thread_->Start(); } sigset_t LowVoltageSignalsHandler::LowVoltageSignalsMask() const { @@ -81,7 +81,7 @@ int LowVoltageSignalsHandler::ignition_off_signo() const { void LowVoltageSignalsHandler::Destroy() { if (signals_handler_thread_) { - signals_handler_thread_->join(); + signals_handler_thread_->Stop(threads::Thread::kThreadSoftStop); } notifications_delegate_.reset(); threads::DeleteThread(signals_handler_thread_); diff --git a/src/components/application_manager/src/request_controller.cc b/src/components/application_manager/src/request_controller.cc index eaebaf4861..1273abe513 100644 --- a/src/components/application_manager/src/request_controller.cc +++ b/src/components/application_manager/src/request_controller.cc @@ -83,7 +83,7 @@ void RequestController::InitializeThreadpool() { for (uint32_t i = 0; i < pool_size_; i++) { snprintf(name, sizeof(name) / sizeof(name[0]), "AM Pool %d", i); pool_.push_back(threads::CreateThread(name, new Worker(this))); - pool_[i]->start(); + pool_[i]->Start(); LOG4CXX_DEBUG(logger_, "Request thread initialized: " << name); } } @@ -98,8 +98,8 @@ void RequestController::DestroyThreadpool() { } for (size_t i = 0; i < pool_.size(); i++) { threads::Thread* thread = pool_[i]; - thread->join(); - delete thread->delegate(); + thread->Stop(threads::Thread::kThreadSoftStop); + delete thread->GetDelegate(); threads::DeleteThread(thread); } pool_.clear(); diff --git a/src/components/connection_handler/src/connection.cc b/src/components/connection_handler/src/connection.cc index 304be9ddb0..9dcf2540a4 100644 --- a/src/components/connection_handler/src/connection.cc +++ b/src/components/connection_handler/src/connection.cc @@ -89,12 +89,12 @@ Connection::Connection(ConnectionHandle connection_handle, heartbeat_monitor_ = new HeartBeatMonitor(heartbeat_timeout_, this); heart_beat_monitor_thread_ = threads::CreateThread("HeartBeatMonitor", heartbeat_monitor_); - heart_beat_monitor_thread_->start(); + heart_beat_monitor_thread_->Start(); } Connection::~Connection() { LOG4CXX_AUTO_TRACE(logger_); - heart_beat_monitor_thread_->join(); + heart_beat_monitor_thread_->Stop(threads::Thread::kThreadSoftStop); delete heartbeat_monitor_; threads::DeleteThread(heart_beat_monitor_thread_); diff --git a/src/components/hmi_message_handler/src/websocket_session.cc b/src/components/hmi_message_handler/src/websocket_session.cc index 7ffb35aba6..b58c118e3d 100644 --- a/src/components/hmi_message_handler/src/websocket_session.cc +++ b/src/components/hmi_message_handler/src/websocket_session.cc @@ -49,7 +49,7 @@ WebsocketSession::WebsocketSession(boost::asio::ip::tcp::socket socket, , thread_delegate_(new LoopThreadDelegate(&message_queue_, this)) , thread_(threads::CreateThread("WS Async Send", thread_delegate_)) { m_writer["indentation"] = ""; - thread_->start(threads::ThreadOptions()); + thread_->Start(threads::ThreadOptions()); } WebsocketSession::~WebsocketSession() {} @@ -64,7 +64,7 @@ void WebsocketSession::Accept() { void WebsocketSession::Shutdown() { shutdown_ = true; thread_delegate_->SetShutdown(); - thread_->join(); + thread_->Stop(threads::Thread::kThreadSoftStop); delete thread_delegate_; threads::DeleteThread(thread_); } diff --git a/src/components/include/utils/threads/message_loop_thread.h b/src/components/include/utils/threads/message_loop_thread.h index 26ae127b69..b1a6714be1 100644 --- a/src/components/include/utils/threads/message_loop_thread.h +++ b/src/components/include/utils/threads/message_loop_thread.h @@ -138,7 +138,7 @@ MessageLoopThread<Q>::MessageLoopThread(const std::string& name, const ThreadOptions& thread_opts) : thread_delegate_(new LoopThreadDelegate(&message_queue_, handler)) , thread_(threads::CreateThread(name.c_str(), thread_delegate_)) { - const bool started = thread_->start(thread_opts); + const bool started = thread_->Start(thread_opts); if (!started) { CREATE_LOGGERPTR_LOCAL(logger_, "Utils") LOG4CXX_ERROR(logger_, "Failed to start thread " << name); @@ -159,7 +159,7 @@ void MessageLoopThread<Q>::PostMessage(const Message& message) { template <class Q> void MessageLoopThread<Q>::Shutdown() { - thread_->join(); + thread_->Stop(threads::Thread::kThreadStopDelegate); } template <class Q> diff --git a/src/components/include/utils/threads/thread.h b/src/components/include/utils/threads/thread.h index 6f72679d63..73a43c48b1 100644 --- a/src/components/include/utils/threads/thread.h +++ b/src/components/include/utils/threads/thread.h @@ -76,36 +76,73 @@ typedef pthread_t PlatformThreadHandle; * printf("ok!\n"); */ -class Thread; -void enqueue_to_join(Thread* thread); - Thread* CreateThread(const char* name, ThreadDelegate* delegate); void DeleteThread(Thread* thread); class Thread { - private: - const std::string name_; - // Should be locked to protect delegate_ value - sync_primitives::Lock delegate_lock_; - ThreadDelegate* delegate_; - PlatformThreadHandle handle_; - ThreadOptions thread_options_; - // Should be locked to protect isThreadRunning_ and thread_created_ values - sync_primitives::Lock state_lock_; - volatile unsigned int isThreadRunning_; - volatile bool stopped_; - volatile bool finalized_; - bool thread_created_; - // Signalled when Thread::start() is called - sync_primitives::ConditionalVariable run_cond_; - public: - static int count; + /** + * @brief ThreadCommand is used to command the thread + * kThreadCommandNone - no command, used to indicate that there is no pending + * command + * kThreadCommandRun - commands thread to run (do another iteration) + * kThreadCommandFinalize - informs thread that must exit + */ + enum ThreadCommand { + kThreadCommandNone, // must be first + kThreadCommandRun, + kThreadCommandFinalize // keep last + // in case of new commands - update/check threadFunc() + }; + + /** + * @brief ThreadState informs outside world about its state + * kThreadStateError - if pthread_create returned an error + * kThreadStateNone - there is no thread at all + * kThreadStateIdle - the thread is in state idle + * kThreadStateRunning - thread is in state running (executing delegates + * threadMain()) + * kThreadStateCompleted - thread completed + */ + enum ThreadState { + kThreadStateError = -1, + kThreadStateNone, + kThreadStateIdle, + kThreadStateRunning, + kThreadStateCompleted + }; + + /** + * @brief ThreadStopOption + * kThreadStopDelegate - executing delegates exitThreadMain and + * move thread to kThreadStateIdle + * kThreadSoftStop - executing kThreadStopDelegate and + * move thread to kThreadStateCompleted + * kThreadForceStop - executing kThreadSoftStop, + * if necessary pthread_cancel or pthread_exit and + * move thread to kThreadStateCompleted + */ + enum ThreadStopOption { + kThreadStopDelegate, + kThreadSoftStop, + kThreadForceStop + }; + + /** + * @brief ThreadJoinOption + * kThreadJoinDelegate - waiting for finish threadMain + * kThreadJoinThread - waiting for finish threadFunc + */ + enum ThreadJoinOption { kThreadJoinDelegate, kThreadJoinThread }; + + friend Thread* CreateThread(const char* name, ThreadDelegate* delegate); + friend void DeleteThread(Thread* thread); + /** * @brief Starts the thread. * @return true if the thread was successfully started. */ - bool start(); + bool Start(); /** * @brief Starts the thread. Behaves exactly like \ref start() in addition to @@ -114,26 +151,44 @@ class Thread { * for details. * @return true if the thread was successfully started. */ - bool start(const ThreadOptions& options); + bool Start(const ThreadOptions& options); - sync_primitives::Lock& delegate_lock() { - return delegate_lock_; - } + /** + * @brief Signals the thread to exit and returns once the thread has exited. + * After this method returns, the Thread object is completely reset and may + * be used as if it were newly constructed (i.e., Start may be called again). + * + * Stop may be called multiple times and is simply ignored if the thread is + * already stopped. + * + * Stop will wait for delegate exit + */ + bool Stop(const ThreadStopOption stop_option); - ThreadDelegate* delegate() const { + /** + * @brief Blocks the current thread until + * the fucntion identified by join_option finishes execution. + * If that fucntion has already terminated, then + * Join returns immediately. + * @param join_option - specify function to wait + */ + void Join(const ThreadJoinOption join_option); + + ThreadDelegate* GetDelegate() const { return delegate_; } - void set_delegate(ThreadDelegate* delegate) { + void SetDelegate(ThreadDelegate* delegate) { + Stop(kThreadStopDelegate); delegate_ = delegate; } - friend Thread* CreateThread(const char* name, ThreadDelegate* delegate); - friend void DeleteThread(Thread* thread); - - public: - // Yield current thread - static void yield(); + /** + * @brief Causes the calling thread to relinquish the CPU. The + * thread is moved to the end of the queue for its static priority and a + * new thread gets to run. + */ + static void SchedYield(); // Get unique ID of currently executing thread static PlatformThreadHandle CurrentId(); @@ -143,22 +198,10 @@ class Thread { std::string name); /** - * @brief Signals the thread to exit and returns once the thread has exited. - * After this method returns, the Thread object is completely reset and may - * be used as if it were newly constructed (i.e., Start may be called again). - * - * Stop may be called multiple times and is simply ignored if the thread is - * already stopped. - */ - void stop(); - - void join(); - - /** * @brief Get thread name. * @return thread name */ - const std::string& name() { + const std::string& GetThreadName() { return name_; } @@ -167,17 +210,16 @@ class Thread { * When a thread is running, the thread_id_ is non-zero. * @return true if the thread has been started, and not yet stopped. */ - bool is_running() const { - return isThreadRunning_; + bool IsRunning() { + sync_primitives::AutoLock auto_lock(state_lock_); + return kThreadStateRunning == thread_state_; } - void set_running(bool running); - /** * @brief Is thread joinable? * @return - Returns true if the thread is joinable. */ - bool is_joinable() const { + bool IsJoinable() const { return thread_options_.is_joinable(); } @@ -185,7 +227,7 @@ class Thread { * @brief Thread stack size * @return thread stack size */ - size_t stack_size() const { + size_t StackSize() const { return thread_options_.stack_size(); } @@ -193,7 +235,7 @@ class Thread { * @brief The native thread handle. * @return thread handle. */ - PlatformThreadHandle thread_handle() const { + PlatformThreadHandle ThreadHandle() const { return handle_; } @@ -207,7 +249,7 @@ class Thread { * @brief Thread options. * @return thread options. */ - const ThreadOptions& thread_options() const { + const ThreadOptions& GetThreadOptions() const { return thread_options_; } @@ -216,9 +258,6 @@ class Thread { */ static size_t kMinStackSize; - protected: - sync_primitives::ConditionalVariable state_cond_; - private: /** * Ctor. @@ -235,6 +274,83 @@ class Thread { static void* threadFunc(void* arg); static void cleanup(void* arg); DISALLOW_COPY_AND_ASSIGN(Thread); + + /** + * @brief Initializes the thread attributes and + * set thread options into attributes + * @param thread_options - thread options + * @return pthread_attr_t - initialized the thread attributes + */ + pthread_attr_t SetThreadCreationAttributes(ThreadOptions* thread_options); + + /** + * @brief Executing delegates exitThreadMain and move thread to + * kThreadStateIdle. That funciton is not thread safe. + * @param auto_lock - Locked object is used to wait + * thread iteration completion + * @return true if delegate has been successfully stopped, + * false otherwise + */ + bool StopDelegate(sync_primitives::AutoLock& auto_lock); + + /** + * @brief Executing StopDelegate and run kThreadCommandFinalize command, + * move thread to kThreadStateCompleted, + * that funciton is not thread safe + * @param auto_lock - Locked object used for waiting + * of the last iteration in thread + * @return true if thread has been successfully stopped, + * false otherwise + */ + bool StopSoft(sync_primitives::AutoLock& auto_lock); + + /** + * @brief Executing StopSoft, if necessary pthread_cancel or pthread_exit + * and move thread to kThreadStateCompleted, + * that funciton is not thread safe + * @param auto_lock - Locked object used for waiting + * of the last iteration in thread + */ + void StopForce(sync_primitives::AutoLock& auto_lock); + + /** + * @brief Waiting finished iteration in thread, + * that funciton is not thread safe + * @param auto_lock - Locked object using for waiting + * finishing iteration in thread + */ + void JoinDelegate(sync_primitives::AutoLock& auto_lock); + + const std::string name_; + ThreadDelegate* delegate_; + PlatformThreadHandle handle_; + ThreadOptions thread_options_; + // Should be locked to protect thread state + sync_primitives::Lock state_lock_; + sync_primitives::ConditionalVariable state_cond_; + + /** + * @brief Used to request actions from worker thread. + */ + volatile ThreadCommand thread_command_; + + /** + * @brief Used from worker thread to inform about its status. + */ + volatile ThreadState thread_state_; + +#ifdef BUILD_TESTS + FRIEND_TEST(PosixThreadTest, + StartThreadWithNullPtrDelegate_ExpectThreadStateNone); + FRIEND_TEST(PosixThreadTest, + StartThreadExecutingThreadMain_ExpectThreadStateRunning); + FRIEND_TEST( + PosixThreadTest, + StartThreadExecutingThreadMainCallStopDelegate_ExpectThreadStateIdle); + FRIEND_TEST( + PosixThreadTest, + StartThreadExecutingThreadMainCallForceStop_ExpectThreadStateCompleted); +#endif }; } // namespace threads diff --git a/src/components/media_manager/src/audio/a2dp_source_player_adapter.cc b/src/components/media_manager/src/audio/a2dp_source_player_adapter.cc index 581997f610..86c3a322eb 100644 --- a/src/components/media_manager/src/audio/a2dp_source_player_adapter.cc +++ b/src/components/media_manager/src/audio/a2dp_source_player_adapter.cc @@ -77,7 +77,7 @@ A2DPSourcePlayerAdapter::A2DPSourcePlayerAdapter( A2DPSourcePlayerAdapter::~A2DPSourcePlayerAdapter() { for (SourcesMap::iterator it = sources_.begin(); sources_.end() != it; ++it) { Pair pair = it->second; - pair.first->join(); + pair.first->Stop(threads::Thread::kThreadSoftStop); delete pair.second; threads::DeleteThread(pair.first); } @@ -105,7 +105,7 @@ void A2DPSourcePlayerAdapter::StartActivity(int32_t application_key) { threads::Thread* new_activity = threads::CreateThread(mac_address.c_str(), delegate); sources_[application_key] = Pair(new_activity, delegate); - new_activity->start(); + new_activity->Start(); } } @@ -119,7 +119,7 @@ void A2DPSourcePlayerAdapter::StopActivity(int32_t application_key) { SourcesMap::iterator it = sources_.find(application_key); if (sources_.end() != it) { Pair pair = it->second; - pair.first->join(); + pair.first->Stop(threads::Thread::kThreadSoftStop); delete pair.second; threads::DeleteThread(pair.first); current_application_ = 0; diff --git a/src/components/media_manager/src/audio/from_mic_recorder_adapter.cc b/src/components/media_manager/src/audio/from_mic_recorder_adapter.cc index 2575643422..326bc33719 100644 --- a/src/components/media_manager/src/audio/from_mic_recorder_adapter.cc +++ b/src/components/media_manager/src/audio/from_mic_recorder_adapter.cc @@ -53,8 +53,8 @@ FromMicRecorderAdapter::FromMicRecorderAdapter() FromMicRecorderAdapter::~FromMicRecorderAdapter() { LOG4CXX_AUTO_TRACE(logger_); if (recorder_thread_) { - recorder_thread_->join(); - delete recorder_thread_->delegate(); + recorder_thread_->Stop(threads::Thread::kThreadSoftStop); + delete recorder_thread_->GetDelegate(); threads::DeleteThread(recorder_thread_); } } @@ -79,7 +79,7 @@ void FromMicRecorderAdapter::StartActivity(int32_t application_key) { } if (NULL != recorder_thread_) { - recorder_thread_->start(); + recorder_thread_->Start(); current_application_ = application_key; } } @@ -94,8 +94,8 @@ void FromMicRecorderAdapter::StopActivity(int32_t application_key) { } if (recorder_thread_) { - recorder_thread_->join(); - delete recorder_thread_->delegate(); + recorder_thread_->Stop(threads::Thread::kThreadSoftStop); + delete recorder_thread_->GetDelegate(); threads::DeleteThread(recorder_thread_); recorder_thread_ = NULL; } diff --git a/src/components/media_manager/src/audio/from_mic_recorder_listener.cc b/src/components/media_manager/src/audio/from_mic_recorder_listener.cc index 721229b250..a59b3a43f4 100644 --- a/src/components/media_manager/src/audio/from_mic_recorder_listener.cc +++ b/src/components/media_manager/src/audio/from_mic_recorder_listener.cc @@ -50,8 +50,8 @@ FromMicRecorderListener::FromMicRecorderListener( FromMicRecorderListener::~FromMicRecorderListener() { LOG4CXX_AUTO_TRACE(logger_); if (reader_) { - reader_->join(); - delete reader_->delegate(); + reader_->Stop(threads::Thread::kThreadSoftStop); + delete reader_->GetDelegate(); threads::DeleteThread(reader_); } } @@ -75,7 +75,7 @@ void FromMicRecorderListener::OnActivityStarted(int32_t application_key) { reader_ = threads::CreateThread("RecorderSender", thread_delegate); } if (reader_) { - reader_->start(); + reader_->Start(); current_application_ = application_key; } } @@ -90,8 +90,8 @@ void FromMicRecorderListener::OnActivityEnded(int32_t application_key) { return; } if (reader_) { - reader_->join(); - delete reader_->delegate(); + reader_->Stop(threads::Thread::kThreadSoftStop); + delete reader_->GetDelegate(); threads::DeleteThread(reader_); reader_ = NULL; } diff --git a/src/components/media_manager/src/audio/from_mic_to_file_recorder_thread.cc b/src/components/media_manager/src/audio/from_mic_to_file_recorder_thread.cc index 877722fd70..5599743bd3 100644 --- a/src/components/media_manager/src/audio/from_mic_to_file_recorder_thread.cc +++ b/src/components/media_manager/src/audio/from_mic_to_file_recorder_thread.cc @@ -69,8 +69,8 @@ FromMicToFileRecorderThread::FromMicToFileRecorderThread( FromMicToFileRecorderThread::~FromMicToFileRecorderThread() { LOG4CXX_AUTO_TRACE(logger_); if (sleepThread_) { - sleepThread_->join(); - delete sleepThread_->delegate(); + sleepThread_->Stop(threads::Thread::kThreadSoftStop); + delete sleepThread_->GetDelegate(); threads::DeleteThread(sleepThread_); } } @@ -276,7 +276,7 @@ void FromMicToFileRecorderThread::threadMain() { sleepThread_ = threads::CreateThread("SleepThread", new SleepThreadDelegate(timeout)); - sleepThread_->start(); + sleepThread_->Start(); } loop = g_main_loop_new(NULL, FALSE); diff --git a/src/components/media_manager/src/streamer_adapter.cc b/src/components/media_manager/src/streamer_adapter.cc index 3ef12e3ed5..7447929f15 100644 --- a/src/components/media_manager/src/streamer_adapter.cc +++ b/src/components/media_manager/src/streamer_adapter.cc @@ -47,7 +47,7 @@ StreamerAdapter::~StreamerAdapter() { if (streamer_) { streamer_->Close(); } - thread_->join(); + thread_->Stop(threads::Thread::kThreadSoftStop); delete streamer_; threads::DeleteThread(thread_); } @@ -64,7 +64,7 @@ void StreamerAdapter::StartActivity(int32_t application_key) { DCHECK(thread_); const size_t kStackSize = 16384; - thread_->start(threads::ThreadOptions(kStackSize)); + thread_->Start(threads::ThreadOptions(kStackSize)); for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin(); media_listeners_.end() != it; @@ -87,8 +87,8 @@ void StreamerAdapter::StopActivity(int32_t application_key) { return; } - DCHECK(thread_); - thread_->stop(); + DCHECK(streamer_); + streamer_->exitThreadMain(); for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin(); media_listeners_.end() != it; diff --git a/src/components/media_manager/src/video/video_stream_to_file_adapter.cc b/src/components/media_manager/src/video/video_stream_to_file_adapter.cc index d907211d16..dd5738b2a2 100644 --- a/src/components/media_manager/src/video/video_stream_to_file_adapter.cc +++ b/src/components/media_manager/src/video/video_stream_to_file_adapter.cc @@ -51,16 +51,16 @@ VideoStreamToFileAdapter::~VideoStreamToFileAdapter() { if ((0 != current_application_) && (is_ready_)) { StopActivity(current_application_); } - thread_->join(); - delete thread_->delegate(); + thread_->Stop(threads::Thread::kThreadSoftStop); + delete thread_->GetDelegate(); threads::DeleteThread(thread_); } void VideoStreamToFileAdapter::Init() { - if (thread_->is_running()) { + if (thread_->IsRunning()) { LOG4CXX_DEBUG(logger, "Start sending thread"); const size_t kStackSize = 16384; - thread_->start(threads::ThreadOptions(kStackSize)); + thread_->Start(threads::ThreadOptions(kStackSize)); } else { LOG4CXX_WARN(logger, "thread is already running"); } diff --git a/src/components/policy/policy_external/src/cache_manager.cc b/src/components/policy/policy_external/src/cache_manager.cc index 039ab5d3d0..164aa6bdf7 100644 --- a/src/components/policy/policy_external/src/cache_manager.cc +++ b/src/components/policy/policy_external/src/cache_manager.cc @@ -284,8 +284,8 @@ CacheManager::CacheManager(bool in_memory) CacheManager::~CacheManager() { LOG4CXX_AUTO_TRACE(logger_); sync_primitives::AutoLock lock(backuper_locker_); - backup_thread_->join(); - delete backup_thread_->delegate(); + backup_thread_->Stop(threads::Thread::kThreadSoftStop); + delete backup_thread_->GetDelegate(); threads::DeleteThread(backup_thread_); } @@ -3156,7 +3156,7 @@ void CacheManager::InitBackupThread() { LOG4CXX_AUTO_TRACE(logger_); backuper_ = new BackgroundBackuper(this); backup_thread_ = threads::CreateThread("Backup thread", backuper_); - backup_thread_->start(); + backup_thread_->Start(); } const PolicySettings& CacheManager::get_settings() const { diff --git a/src/components/policy/policy_external/src/update_status_manager.cc b/src/components/policy/policy_external/src/update_status_manager.cc index d34d1b7817..c50aefd0eb 100644 --- a/src/components/policy/policy_external/src/update_status_manager.cc +++ b/src/components/policy/policy_external/src/update_status_manager.cc @@ -48,14 +48,14 @@ UpdateStatusManager::UpdateStatusManager() update_status_thread_delegate_ = new UpdateThreadDelegate(this); thread_ = threads::CreateThread("UpdateStatusThread", update_status_thread_delegate_); - thread_->start(); + thread_->Start(); } UpdateStatusManager::~UpdateStatusManager() { LOG4CXX_AUTO_TRACE(logger_); DCHECK(update_status_thread_delegate_); DCHECK(thread_); - thread_->join(); + thread_->Stop(threads::Thread::kThreadSoftStop); delete update_status_thread_delegate_; threads::DeleteThread(thread_); } diff --git a/src/components/policy/policy_regular/src/cache_manager.cc b/src/components/policy/policy_regular/src/cache_manager.cc index ed2bbefcd7..7c491a28a5 100644 --- a/src/components/policy/policy_regular/src/cache_manager.cc +++ b/src/components/policy/policy_regular/src/cache_manager.cc @@ -113,14 +113,14 @@ CacheManager::CacheManager() LOG4CXX_AUTO_TRACE(logger_); backuper_ = new BackgroundBackuper(this); backup_thread_ = threads::CreateThread("Backup thread", backuper_); - backup_thread_->start(); + backup_thread_->Start(); } CacheManager::~CacheManager() { LOG4CXX_AUTO_TRACE(logger_); sync_primitives::AutoLock lock(backuper_locker_); - backup_thread_->join(); - delete backup_thread_->delegate(); + backup_thread_->Stop(threads::Thread::kThreadSoftStop); + delete backup_thread_->GetDelegate(); threads::DeleteThread(backup_thread_); } diff --git a/src/components/telemetry_monitor/src/telemetry_monitor.cc b/src/components/telemetry_monitor/src/telemetry_monitor.cc index f6cac49f59..427ecada76 100644 --- a/src/components/telemetry_monitor/src/telemetry_monitor.cc +++ b/src/components/telemetry_monitor/src/telemetry_monitor.cc @@ -65,9 +65,9 @@ void TelemetryMonitor::Start() { void TelemetryMonitor::set_streamer(std::shared_ptr<Streamer> streamer) { LOG4CXX_AUTO_TRACE(logger_); - if (thread_ && !thread_->is_running()) { + if (thread_ && !thread_->IsRunning()) { + thread_->SetDelegate(streamer_.get()); streamer_ = streamer; - thread_->set_delegate(streamer_.get()); } else { LOG4CXX_ERROR(logger_, "Unable to replace streamer if it is active"); } @@ -99,15 +99,14 @@ void TelemetryMonitor::Init( protocol_handler->SetTelemetryObserver(&ph_observer); DCHECK_OR_RETURN_VOID(thread_); - thread_->start(threads::ThreadOptions()); + thread_->Start(threads::ThreadOptions()); } void TelemetryMonitor::Stop() { LOG4CXX_AUTO_TRACE(logger_); if (thread_) { - thread_->stop(); - thread_->join(); - if (thread_->delegate()) { + thread_->Stop(threads::Thread::kThreadSoftStop); + if (thread_->GetDelegate()) { streamer_.reset(); } threads::DeleteThread(thread_); diff --git a/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc b/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc index 4759b2003a..d95ed72b14 100644 --- a/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc +++ b/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc @@ -158,13 +158,13 @@ BluetoothDeviceScanner::BluetoothDeviceScanner( } BluetoothDeviceScanner::~BluetoothDeviceScanner() { - thread_->join(); - delete thread_->delegate(); + thread_->Stop(threads::Thread::kThreadSoftStop); + delete thread_->GetDelegate(); threads::DeleteThread(thread_); } bool BluetoothDeviceScanner::IsInitialised() const { - return thread_ && thread_->is_running(); + return thread_ && thread_->IsRunning(); } void BluetoothDeviceScanner::UpdateTotalDeviceList() { @@ -476,7 +476,7 @@ void BluetoothDeviceScanner::TimedWaitForDeviceScanRequest() { TransportAdapter::Error BluetoothDeviceScanner::Init() { LOG4CXX_AUTO_TRACE(logger_); - if (!thread_->start()) { + if (!thread_->Start()) { LOG4CXX_ERROR(logger_, "Bluetooth device scanner thread start failed"); return TransportAdapter::FAIL; } @@ -495,7 +495,7 @@ void BluetoothDeviceScanner::Terminate() { } LOG4CXX_INFO(logger_, "Waiting for bluetooth device scanner thread termination"); - thread_->stop(); + thread_->Stop(threads::Thread::kThreadStopDelegate); LOG4CXX_INFO(logger_, "Bluetooth device scanner thread stopped"); } } diff --git a/src/components/transport_manager/src/cloud/websocket_client_connection.cc b/src/components/transport_manager/src/cloud/websocket_client_connection.cc index 955ce17f67..81e503131f 100644 --- a/src/components/transport_manager/src/cloud/websocket_client_connection.cc +++ b/src/components/transport_manager/src/cloud/websocket_client_connection.cc @@ -185,7 +185,7 @@ TransportAdapter::Error WebsocketClientConnection::Start() { wss_.binary(true); } #endif // ENABLE_SECURITY - write_thread_->start(threads::ThreadOptions()); + write_thread_->Start(threads::ThreadOptions()); controller_->ConnectDone(device_uid_, app_handle_); // Start async read @@ -289,7 +289,7 @@ void WebsocketClientConnection::Shutdown() { if (thread_delegate_) { thread_delegate_->SetShutdown(); - write_thread_->join(); + write_thread_->Stop(threads::Thread::kThreadSoftStop); delete thread_delegate_; thread_delegate_ = NULL; threads::DeleteThread(write_thread_); diff --git a/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc b/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc index 6a43f66c64..e509252dec 100644 --- a/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc +++ b/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc @@ -81,16 +81,16 @@ IAP2USBEmulationTransportAdapter::IAP2USBEmulationTransportAdapter( : TcpTransportAdapter(port, last_state_wrapper, settings), out_(0) { auto delegate = new IAPSignalHandlerDelegate(*this); signal_handler_ = threads::CreateThread("iAP signal handler", delegate); - signal_handler_->start(); + signal_handler_->Start(); const auto result = mkfifo(out_signals_channel, mode); UNUSED(result); LOG4CXX_DEBUG(logger_, "Out signals channel creation result: " << result); } IAP2USBEmulationTransportAdapter::~IAP2USBEmulationTransportAdapter() { - signal_handler_->join(); - auto delegate = signal_handler_->delegate(); - signal_handler_->set_delegate(NULL); + signal_handler_->Stop(threads::Thread::kThreadSoftStop); + auto delegate = signal_handler_->GetDelegate(); + signal_handler_->SetDelegate(NULL); delete delegate; threads::DeleteThread(signal_handler_); LOG4CXX_DEBUG(logger_, "Out close result: " << close(out_)); diff --git a/src/components/transport_manager/src/tcp/platform_specific/linux/platform_specific_network_interface_listener.cc b/src/components/transport_manager/src/tcp/platform_specific/linux/platform_specific_network_interface_listener.cc index b37e5dc962..8eaaca868a 100644 --- a/src/components/transport_manager/src/tcp/platform_specific/linux/platform_specific_network_interface_listener.cc +++ b/src/components/transport_manager/src/tcp/platform_specific/linux/platform_specific_network_interface_listener.cc @@ -98,7 +98,7 @@ PlatformSpecificNetworkInterfaceListener:: Stop(); Deinit(); - delete thread_->delegate(); + delete thread_->GetDelegate(); threads::DeleteThread(thread_); } @@ -173,12 +173,12 @@ bool PlatformSpecificNetworkInterfaceListener::Start() { return false; } - if (thread_->is_running()) { + if (thread_->IsRunning()) { LOG4CXX_WARN(logger_, "Interface listener is already started"); return false; } - if (!thread_->start()) { + if (!thread_->Start()) { LOG4CXX_ERROR(logger_, "Failed to start interface listener"); return false; } @@ -190,12 +190,12 @@ bool PlatformSpecificNetworkInterfaceListener::Start() { bool PlatformSpecificNetworkInterfaceListener::Stop() { LOG4CXX_AUTO_TRACE(logger_); - if (!thread_->is_running()) { + if (!thread_->IsRunning()) { LOG4CXX_DEBUG(logger_, "interface listener is not running"); return false; } - thread_->join(); + thread_->Stop(threads::Thread::kThreadStopDelegate); LOG4CXX_INFO(logger_, "Network interface listener stopped"); return true; diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc index df4409b8f3..adc6ab3f18 100644 --- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc +++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc @@ -156,7 +156,7 @@ bool TcpClientListener::IsInitialised() const { TcpClientListener::~TcpClientListener() { LOG4CXX_AUTO_TRACE(logger_); StopListening(); - delete thread_->delegate(); + delete thread_->GetDelegate(); threads::DeleteThread(thread_); Terminate(); delete interface_listener_; @@ -467,7 +467,7 @@ TransportAdapter::Error TcpClientListener::StartListeningThread() { thread_stop_requested_ = false; - if (!thread_->start()) { + if (!thread_->Start()) { return TransportAdapter::FAIL; } return TransportAdapter::OK; @@ -479,7 +479,7 @@ TransportAdapter::Error TcpClientListener::StopListeningThread() { // StopListening() can be called from multiple threads sync_primitives::AutoLock auto_lock(start_stop_lock_); - thread_->join(); + thread_->Stop(threads::Thread::kThreadStopDelegate); close(pipe_fds_[1]); pipe_fds_[1] = -1; diff --git a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc index 7d96c685f1..637e3f840a 100644 --- a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc +++ b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc @@ -83,8 +83,8 @@ ThreadedSocketConnection::~ThreadedSocketConnection() { void ThreadedSocketConnection::StopAndJoinThread() { Disconnect(); if (thread_) { - thread_->join(); - delete thread_->delegate(); + thread_->Stop(threads::Thread::kThreadSoftStop); + delete thread_->GetDelegate(); threads::DeleteThread(thread_); thread_ = nullptr; } @@ -115,7 +115,7 @@ TransportAdapter::Error ThreadedSocketConnection::Start() { return TransportAdapter::FAIL; } - if (!thread_->start()) { + if (!thread_->Start()) { LOG4CXX_ERROR(logger_, "thread creation failed"); return TransportAdapter::FAIL; } diff --git a/src/components/transport_manager/src/transport_manager_impl.cc b/src/components/transport_manager/src/transport_manager_impl.cc index c8e5e431fd..62fd75b7dd 100644 --- a/src/components/transport_manager/src/transport_manager_impl.cc +++ b/src/components/transport_manager/src/transport_manager_impl.cc @@ -296,7 +296,7 @@ int TransportManagerImpl::Disconnect(const ConnectionUID cid) { const uint32_t disconnect_timeout = get_settings().transport_manager_disconnect_timeout(); if (disconnect_timeout > 0) { - connection->timer->start(disconnect_timeout); + connection->timer->Start(disconnect_timeout); } } else { connection->transport_adapter->Disconnect(connection->device, diff --git a/src/components/transport_manager/src/usb/libusb/usb_handler.cc b/src/components/transport_manager/src/usb/libusb/usb_handler.cc index d1fc0af7f7..8180d257ee 100644 --- a/src/components/transport_manager/src/usb/libusb/usb_handler.cc +++ b/src/components/transport_manager/src/usb/libusb/usb_handler.cc @@ -100,8 +100,8 @@ UsbHandler::~UsbHandler() { libusb_hotplug_deregister_callback(libusb_context_, left_callback_handle_); } - thread_->join(); - delete thread_->delegate(); + thread_->Stop(threads::Thread::kThreadSoftStop); + delete thread_->GetDelegate(); threads::DeleteThread(thread_); if (libusb_context_) { @@ -324,7 +324,7 @@ TransportAdapter::Error UsbHandler::Init() { return TransportAdapter::FAIL; } - if (!thread_->start()) { + if (!thread_->Start()) { LOG4CXX_ERROR(logger_, "USB device scanner thread start failed, error code"); LOG4CXX_TRACE(logger_, "exit with TransportAdapter::FAIL."); diff --git a/src/components/transport_manager/src/websocket_server/websocket_connection.cc b/src/components/transport_manager/src/websocket_server/websocket_connection.cc index 601110af2f..81985315d2 100644 --- a/src/components/transport_manager/src/websocket_server/websocket_connection.cc +++ b/src/components/transport_manager/src/websocket_server/websocket_connection.cc @@ -59,7 +59,7 @@ WebSocketConnection<WebSocketSession<> >::WebSocketConnection( &message_queue_, [this](Message frame) { session_->WriteDown(frame); })) , thread_(threads::CreateThread("WS Async Send", thread_delegate_)) { - thread_->start(threads::ThreadOptions()); + thread_->Start(threads::ThreadOptions()); } #ifdef ENABLE_SECURITY @@ -85,7 +85,7 @@ WebSocketConnection<WebSocketSecureSession<> >::WebSocketConnection( &message_queue_, [this](Message frame) { session_->WriteDown(frame); })) , thread_(threads::CreateThread("WS Async Send", thread_delegate_)) { - thread_->start(threads::ThreadOptions()); + thread_->Start(threads::ThreadOptions()); } template class WebSocketConnection<WebSocketSecureSession<> >; #endif // ENABLE_SECURITY @@ -168,7 +168,7 @@ void WebSocketConnection<Session>::Shutdown() { if (thread_delegate_) { session_->Shutdown(); thread_delegate_->SetShutdown(); - thread_->join(); + thread_->Stop(threads::Thread::kThreadSoftStop); delete thread_delegate_; thread_delegate_ = nullptr; threads::DeleteThread(thread_); diff --git a/src/components/transport_manager/test/platform_specific/linux/linux_network_interface_listener_test.cc b/src/components/transport_manager/test/platform_specific/linux/linux_network_interface_listener_test.cc index 0f2f0a2045..befba30fbd 100644 --- a/src/components/transport_manager/test/platform_specific/linux/linux_network_interface_listener_test.cc +++ b/src/components/transport_manager/test/platform_specific/linux/linux_network_interface_listener_test.cc @@ -129,7 +129,7 @@ TEST_F(NetworkInterfaceListenerTest, Start_success) { // the "isThreadRunning_" flag of the thread will be update slightly later SleepFor(kThreadStartWaitMsec); - EXPECT_TRUE(interface_listener_impl_->GetThread()->is_running()); + EXPECT_TRUE(interface_listener_impl_->GetThread()->IsRunning()); EXPECT_TRUE(waiter.WaitFor(1, kStartNotificationTimeoutMsec)); @@ -166,7 +166,7 @@ TEST_F(NetworkInterfaceListenerTest, Stop_success) { EXPECT_TRUE(interface_listener_impl_->Stop()); SleepFor(kThreadStartWaitMsec); - EXPECT_FALSE(interface_listener_impl_->GetThread()->is_running()); + EXPECT_FALSE(interface_listener_impl_->GetThread()->IsRunning()); Deinit(); } diff --git a/src/components/transport_manager/test/tcp_client_listener_test.cc b/src/components/transport_manager/test/tcp_client_listener_test.cc index dbd7799b62..d71db3e770 100644 --- a/src/components/transport_manager/test/tcp_client_listener_test.cc +++ b/src/components/transport_manager/test/tcp_client_listener_test.cc @@ -166,9 +166,9 @@ TEST_P(TcpClientListenerTest, StartListening) { SleepFor(kThreadStartWaitMsec); if (InterfaceNameSpecified()) { - EXPECT_FALSE(tcp_client_listener_->thread()->is_running()); + EXPECT_FALSE(tcp_client_listener_->thread()->IsRunning()); } else { - EXPECT_TRUE(tcp_client_listener_->thread()->is_running()); + EXPECT_TRUE(tcp_client_listener_->thread()->IsRunning()); } // Stop() and Deinit() will be called during destructor @@ -200,7 +200,7 @@ TEST_P(TcpClientListenerTest, StopListening) { EXPECT_CALL(*interface_listener_mock_, Stop()).WillOnce(Return(true)); EXPECT_EQ(TransportAdapter::OK, tcp_client_listener_->StopListening()); - EXPECT_FALSE(tcp_client_listener_->thread()->is_running()); + EXPECT_FALSE(tcp_client_listener_->thread()->IsRunning()); EXPECT_CALL(*interface_listener_mock_, Deinit()).Times(1); } @@ -310,7 +310,7 @@ TEST_P(TcpClientListenerTest, OnIPAddressUpdated_ValidIPv4Address) { SleepFor(kThreadStartWaitMsec); - EXPECT_TRUE(tcp_client_listener_->thread()->is_running()); + EXPECT_TRUE(tcp_client_listener_->thread()->IsRunning()); } EXPECT_CALL(*interface_listener_mock_, Stop()).WillOnce(Return(true)); @@ -357,7 +357,7 @@ TEST_P(TcpClientListenerTest, OnIPAddressUpdated_IPv4Address_changed) { SleepFor(kThreadStartWaitMsec); - EXPECT_TRUE(tcp_client_listener_->thread()->is_running()); + EXPECT_TRUE(tcp_client_listener_->thread()->IsRunning()); } EXPECT_CALL(*interface_listener_mock_, Stop()).WillOnce(Return(true)); @@ -403,7 +403,7 @@ TEST_P(TcpClientListenerTest, OnIPAddressUpdated_IPv4Address_same) { SleepFor(kThreadStartWaitMsec); - EXPECT_TRUE(tcp_client_listener_->thread()->is_running()); + EXPECT_TRUE(tcp_client_listener_->thread()->IsRunning()); } EXPECT_CALL(*interface_listener_mock_, Stop()).WillOnce(Return(true)); @@ -450,7 +450,7 @@ TEST_P(TcpClientListenerTest, OnIPAddressUpdated_IPv4Address_disabled) { SleepFor(kThreadStartWaitMsec); - EXPECT_FALSE(tcp_client_listener_->thread()->is_running()); + EXPECT_FALSE(tcp_client_listener_->thread()->IsRunning()); } EXPECT_CALL(*interface_listener_mock_, Stop()).WillOnce(Return(true)); @@ -509,7 +509,7 @@ TEST_P(TcpClientListenerTest, OnIPAddressUpdated_IPv4Address_reenabled) { SleepFor(kThreadStartWaitMsec); - EXPECT_TRUE(tcp_client_listener_->thread()->is_running()); + EXPECT_TRUE(tcp_client_listener_->thread()->IsRunning()); } EXPECT_CALL(*interface_listener_mock_, Stop()).WillOnce(Return(true)); @@ -539,7 +539,7 @@ TEST_P(TcpClientListenerTest, OnIPAddressUpdated_EmptyIPv4Address) { SleepFor(kThreadStartWaitMsec); - EXPECT_FALSE(tcp_client_listener_->thread()->is_running()); + EXPECT_FALSE(tcp_client_listener_->thread()->IsRunning()); } EXPECT_CALL(*interface_listener_mock_, Stop()).WillOnce(Return(true)); diff --git a/src/components/utils/src/threads/async_runner.cc b/src/components/utils/src/threads/async_runner.cc index 740db016be..faa2f24174 100644 --- a/src/components/utils/src/threads/async_runner.cc +++ b/src/components/utils/src/threads/async_runner.cc @@ -44,7 +44,7 @@ AsyncRunner::AsyncRunner(const std::string& thread_name) : executor_(new AsyncRunnerDelegate) { LOG4CXX_AUTO_TRACE(logger_); thread_ = threads::CreateThread(thread_name.c_str(), executor_); - thread_->start(); + thread_->Start(); } void AsyncRunner::AsyncRun(ThreadDelegate* delegate) { @@ -54,12 +54,12 @@ void AsyncRunner::AsyncRun(ThreadDelegate* delegate) { void AsyncRunner::Stop() { LOG4CXX_AUTO_TRACE(logger_); - thread_->join(); + thread_->Stop(threads::Thread::kThreadStopDelegate); } AsyncRunner::~AsyncRunner() { LOG4CXX_AUTO_TRACE(logger_); - thread_->join(); + thread_->Stop(threads::Thread::kThreadSoftStop); delete executor_; threads::DeleteThread(thread_); } diff --git a/src/components/utils/src/threads/thread_delegate.cc b/src/components/utils/src/threads/thread_delegate.cc index e071959522..a23f1b051e 100644 --- a/src/components/utils/src/threads/thread_delegate.cc +++ b/src/components/utils/src/threads/thread_delegate.cc @@ -41,7 +41,7 @@ namespace threads { ThreadDelegate::~ThreadDelegate() { if (thread_) { - thread_->set_delegate(NULL); + thread_->SetDelegate(NULL); } } @@ -50,7 +50,7 @@ void ThreadDelegate::exitThreadMain() { if (thread_->IsCurrentThread()) { pthread_exit(NULL); } else { - pthread_cancel(thread_->thread_handle()); + pthread_cancel(thread_->ThreadHandle()); } thread_ = NULL; } diff --git a/src/components/utils/src/threads/thread_posix.cc b/src/components/utils/src/threads/thread_posix.cc index 35c1cd7084..59ab122ad0 100644 --- a/src/components/utils/src/threads/thread_posix.cc +++ b/src/components/utils/src/threads/thread_posix.cc @@ -60,61 +60,76 @@ size_t Thread::kMinStackSize = void Thread::cleanup(void* arg) { LOG4CXX_AUTO_TRACE(logger_); - Thread* thread = reinterpret_cast<Thread*>(arg); + Thread* thread = static_cast<Thread*>(arg); sync_primitives::AutoLock auto_lock(thread->state_lock_); - thread->isThreadRunning_ = false; + thread->thread_state_ = kThreadStateCompleted; + thread->thread_command_ = kThreadCommandNone; thread->state_cond_.Broadcast(); } void* Thread::threadFunc(void* arg) { - // 0 - state_lock unlocked - // stopped = 0 - // running = 0 - // finalized = 0 - // 4 - state_lock unlocked - // stopped = 1 - // running = 1 - // finalized = 0 - // 5 - state_lock unlocked - // stopped = 1 - // running = 1 - // finalized = 1 - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - - threads::Thread* thread = reinterpret_cast<Thread*>(arg); + auto thread_procedure_execution = [](Thread* thread) { + thread->thread_state_ = kThreadStateRunning; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_testcancel(); + thread->state_lock_.Release(); + thread->delegate_->threadMain(); + thread->state_lock_.Acquire(); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + }; + + threads::Thread* thread = static_cast<Thread*>(arg); DCHECK(thread); + thread->state_lock_.Acquire(); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_cleanup_push(&cleanup, thread); - thread->state_lock_.Acquire(); thread->state_cond_.Broadcast(); - while (!thread->finalized_) { + // We have special variable for controlling iterations/exiting thread + // in order to separate decision logic (continue iterations or exit?) + // from controlling while cycle + bool continueIterations = true; + + while (continueIterations) { + thread->thread_state_ = kThreadStateIdle; LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " iteration"); - thread->run_cond_.Wait(thread->state_lock_); + thread->state_cond_.Wait(thread->state_lock_); LOG4CXX_DEBUG(logger_, - "Thread #" << pthread_self() << " execute. " - << "stopped_ = " << thread->stopped_ - << "; finalized_ = " << thread->finalized_); - if (!thread->stopped_ && !thread->finalized_) { - thread->isThreadRunning_ = true; - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_testcancel(); - - thread->state_lock_.Release(); - thread->delegate_->threadMain(); - thread->state_lock_.Acquire(); - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - thread->isThreadRunning_ = false; + "Thread #" + << pthread_self() << " execute. " + << "thread_command_ = " << thread->thread_command_); + + switch (thread->thread_command_) { + case kThreadCommandRun: + thread_procedure_execution(thread); + break; + + case kThreadCommandFinalize: + continueIterations = false; + break; + + default: + LOG4CXX_ERROR(logger_, + "Incorrect thread command: " << thread->thread_command_); + break; } + + thread->thread_command_ = kThreadCommandNone; // consumed thread->state_cond_.Broadcast(); LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " finished iteration"); } thread->state_lock_.Release(); - pthread_cleanup_pop(1); + + const auto execute_cleanup = 1; + // The pthread_cleanup_pop() function shall remove the routine at the top of + // the calling thread's cancellation cleanup stack and optionally invoke it + // (if execute is non-zero). + pthread_cleanup_pop(execute_cleanup); LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " exited successfully"); @@ -139,13 +154,11 @@ Thread::Thread(const char* name, ThreadDelegate* delegate) , delegate_(delegate) , handle_(0) , thread_options_() - , isThreadRunning_(0) - , stopped_(false) - , finalized_(false) - , thread_created_(false) {} + , thread_command_(kThreadCommandNone) + , thread_state_(kThreadStateNone) {} -bool Thread::start() { - return start(thread_options_); +bool Thread::Start() { + return Start(thread_options_); } PlatformThreadHandle Thread::CurrentId() { @@ -153,33 +166,172 @@ PlatformThreadHandle Thread::CurrentId() { } bool Thread::IsCurrentThread() const { - return pthread_equal(CurrentId(), thread_handle()); + return pthread_equal(CurrentId(), ThreadHandle()); } -bool Thread::start(const ThreadOptions& options) { +bool Thread::Start(const ThreadOptions& options) { LOG4CXX_AUTO_TRACE(logger_); sync_primitives::AutoLock auto_lock(state_lock_); - // 1 - state_lock locked - // stopped = 0 - // running = 0 if (!delegate_) { LOG4CXX_ERROR(logger_, "Cannot start thread " << name_ << ": delegate is NULL"); - // 0 - state_lock unlocked return false; } - if (isThreadRunning_) { + if (kThreadStateCompleted == thread_state_) { + LOG4CXX_ERROR(logger_, + "Cannot start thread " << name_ << ": thread completed"); + return false; + } + + if (kThreadStateRunning == thread_state_) { LOG4CXX_TRACE( logger_, "EXIT thread " << name_ << " #" << handle_ << " is already running"); return true; } - thread_options_ = options; + if (!handle_) { + thread_options_ = options; + pthread_attr_t attributes = SetThreadCreationAttributes(&thread_options_); + + int pthread_result = + pthread_create(&handle_, &attributes, threadFunc, this); + pthread_attr_destroy(&attributes); + + if (EOK != pthread_result) { + LOG4CXX_ERROR(logger_, + "Couldn't create thread " + << name_ << ". Error code = " << pthread_result + << " (\"" << strerror(pthread_result) << "\")"); + handle_ = 0; + thread_state_ = kThreadStateError; + return false; + } + + LOG4CXX_DEBUG(logger_, "Created thread: " << name_); + SetNameForId(handle_, name_); + // state_lock 0 + // possible concurrencies: stop and threadFunc + state_cond_.Wait(auto_lock); + } + + if (kThreadCommandFinalize == thread_command_) { + LOG4CXX_DEBUG( + logger_, "Thread " << name_ << " #" << handle_ << " waiting finalize."); + return false; + } + + thread_command_ = kThreadCommandRun; + state_cond_.NotifyOne(); + + LOG4CXX_DEBUG(logger_, + "Thread " << name_ << " #" << handle_ << " started." + << " pthread_result = " << EOK); + return true; +} + +void Thread::SchedYield() { + sched_yield(); +} + +bool Thread::Stop(const ThreadStopOption stop_option) { + LOG4CXX_AUTO_TRACE(logger_); + DCHECK_OR_RETURN( + (kThreadStopDelegate <= stop_option) && (kThreadForceStop >= stop_option), + false); + sync_primitives::AutoLock auto_lock(state_lock_); + thread_command_ = kThreadCommandNone; // cancel all active commands + + if (!handle_ && kThreadStateError != thread_state_) { + LOG4CXX_WARN( + logger_, + "Thread " << name_ << ": can't stopped,thread is not run handle_: " + << handle_ << " thread_state_ is: " << thread_state_); + return false; + } + + if (kThreadStateError == thread_state_ || + kThreadStateCompleted == thread_state_) { + LOG4CXX_WARN(logger_, + "Thread " << name_ << ": can't stopped thread_state_ is: " + << thread_state_); + return false; + } + + LOG4CXX_DEBUG(logger_, + "Stopping thread #" << handle_ << " \"" << name_ << "\""); + + bool result = false; + switch (stop_option) { + case kThreadStopDelegate: + result = StopDelegate(auto_lock); + break; + case kThreadSoftStop: + result = StopSoft(auto_lock); + break; + case kThreadForceStop: + StopForce(auto_lock); + result = true; + break; + default: + LOG4CXX_ERROR(logger_, "Incorrect thread stop option: " << stop_option); + break; + } + + LOG4CXX_DEBUG( + logger_, + "Is thread stopped #" << handle_ << " \"" << name_ << " \": " << result); + return result; +} + +void Thread::Join(const ThreadJoinOption join_option) { + LOG4CXX_AUTO_TRACE(logger_); + DCHECK_OR_RETURN_VOID(!IsCurrentThread()); + DCHECK_OR_RETURN_VOID((kThreadJoinDelegate <= join_option) && + (kThreadJoinThread >= join_option)); + if (!handle_ || kThreadStateError == thread_state_) { + LOG4CXX_WARN(logger_, + "Thread " << name_ << ": is not joinable handle_: " << handle_ + << " thread_state_ is: " << thread_state_); + return; + } + + { + sync_primitives::AutoLock auto_lock(state_lock_); + JoinDelegate(auto_lock); + } + + if (kThreadJoinDelegate == join_option) { + return; + } + + LOG4CXX_DEBUG( + logger_, + "Waiting for #" << handle_ << " to finished thread #" << pthread_self()); + + pthread_join(handle_, NULL); +} + +Thread::~Thread() { + Stop(kThreadForceStop); + Join(kThreadJoinThread); +} +Thread* CreateThread(const char* name, ThreadDelegate* delegate) { + Thread* thread = new Thread(name, delegate); + delegate->set_thread(thread); + return thread; +} + +void DeleteThread(Thread* thread) { + delete thread; +} + +pthread_attr_t Thread::SetThreadCreationAttributes( + ThreadOptions* thread_options) { pthread_attr_t attributes; int pthread_result = pthread_attr_init(&attributes); if (pthread_result != EOK) { @@ -189,7 +341,14 @@ bool Thread::start(const ThreadOptions& options) { << "\")"); } - if (!thread_options_.is_joinable()) { + if (!thread_options) { + return attributes; + } + + if (!thread_options->is_joinable()) { + LOG4CXX_WARN(logger_, + "Set state detach attribute, undefined behavior possible with " + "this attribute"); pthread_result = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED); if (pthread_result != EOK) { @@ -197,11 +356,11 @@ bool Thread::start(const ThreadOptions& options) { "Couldn't set detach state attribute. Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")"); - thread_options_.is_joinable(false); + thread_options->is_joinable(false); } } - const size_t stack_size = thread_options_.stack_size(); + const size_t stack_size = thread_options->stack_size(); if (stack_size >= Thread::kMinStackSize) { pthread_result = pthread_attr_setstacksize(&attributes, stack_size); if (pthread_result != EOK) { @@ -212,95 +371,111 @@ bool Thread::start(const ThreadOptions& options) { } } else { ThreadOptions thread_options_temp(Thread::kMinStackSize, - thread_options_.is_joinable()); - thread_options_ = thread_options_temp; + thread_options->is_joinable()); + *thread_options = thread_options_temp; } - if (!thread_created_) { - // state_lock 1 - pthread_result = pthread_create(&handle_, &attributes, threadFunc, this); - if (pthread_result == EOK) { - LOG4CXX_DEBUG(logger_, "Created thread: " << name_); - SetNameForId(handle_, name_); - // state_lock 0 - // possible concurrencies: stop and threadFunc - state_cond_.Wait(auto_lock); - thread_created_ = true; - } else { - LOG4CXX_ERROR(logger_, - "Couldn't create thread " - << name_ << ". Error code = " << pthread_result - << " (\"" << strerror(pthread_result) << "\")"); - } - } - stopped_ = false; - run_cond_.NotifyOne(); - LOG4CXX_DEBUG(logger_, - "Thread " << name_ << " #" << handle_ << " started." - << " pthread_result = " << pthread_result); - pthread_attr_destroy(&attributes); - return pthread_result == EOK; + return attributes; } -void Thread::yield() { - sched_yield(); +bool Thread::StopDelegate(sync_primitives::AutoLock& auto_lock) { + LOG4CXX_AUTO_TRACE(logger_); + + if (kThreadStateRunning != thread_state_) { + LOG4CXX_WARN(logger_, + "Thread " << name_ << ": task can't stopped thread_state_ is: " + << thread_state_); + return false; + } + + if (!delegate_) { + LOG4CXX_WARN(logger_, + "Thread " << name_ << ": task can't stopped delegate is NULL"); + return false; + } + + delegate_->exitThreadMain(); + + JoinDelegate(auto_lock); + + return true; } -void Thread::stop() { +bool Thread::StopSoft(sync_primitives::AutoLock& auto_lock) { LOG4CXX_AUTO_TRACE(logger_); - sync_primitives::AutoLock auto_lock(state_lock_); - stopped_ = true; + if (kThreadStateRunning == thread_state_) { + bool result = StopDelegate(auto_lock); + if (!result) { + return false; + } + } - LOG4CXX_DEBUG(logger_, - "Stopping thread #" << handle_ << " \"" << name_ << "\""); + if (kThreadStateIdle != thread_state_) { + LOG4CXX_WARN(logger_, + "Thread " << name_ << ": can't stopped thread_state_ is: " + << thread_state_); + return false; + } - if (delegate_ && isThreadRunning_) { - delegate_->exitThreadMain(); + thread_command_ = kThreadCommandFinalize; + state_cond_.NotifyOne(); + + if (!pthread_equal(pthread_self(), handle_)) { + LOG4CXX_DEBUG(logger_, + "Waiting for #" << handle_ << " last iteration in thread #" + << pthread_self()); + state_cond_.Wait(auto_lock); } - LOG4CXX_DEBUG(logger_, - "Stopped thread #" << handle_ << " \"" << name_ << " \""); + return true; } -void Thread::join() { +void Thread::StopForce(sync_primitives::AutoLock& auto_lock) { LOG4CXX_AUTO_TRACE(logger_); - DCHECK_OR_RETURN_VOID(!IsCurrentThread()); - stop(); - - sync_primitives::AutoLock auto_lock(state_lock_); - run_cond_.NotifyOne(); - if (isThreadRunning_) { - if (!pthread_equal(pthread_self(), handle_)) { - LOG4CXX_DEBUG(logger_, - "Waiting for #" << handle_ - << " finished iteration in thread #" - << pthread_self()); - state_cond_.Wait(auto_lock); + if (kThreadStateRunning == thread_state_ || + kThreadStateIdle == thread_state_) { + bool result = StopSoft(auto_lock); + if (result) { + return; } } -} + // Notify not to thread but to actor + // that may starting this thread in race condition. + thread_state_ = kThreadStateCompleted; + state_cond_.NotifyOne(); -Thread::~Thread() { - finalized_ = true; - stopped_ = true; - join(); - // in some platforms pthread_join behaviour is undefined when thread is - // not created(pthread_create) and call pthread_join. - if (handle_) { - pthread_join(handle_, NULL); + LOG4CXX_WARN(logger_, + "The thread was not soft stopped, the start of a forced stop"); + + if (!pthread_equal(pthread_self(), handle_)) { + LOG4CXX_DEBUG(logger_, "Thread #" << handle_ << " cancel"); + pthread_cancel(handle_); + } else { + LOG4CXX_DEBUG(logger_, "Thread #" << handle_ << " exit"); + pthread_exit(NULL); + NOTREACHED(); } } -Thread* CreateThread(const char* name, ThreadDelegate* delegate) { - Thread* thread = new Thread(name, delegate); - delegate->set_thread(thread); - return thread; -} +void Thread::JoinDelegate(sync_primitives::AutoLock& auto_lock) { + LOG4CXX_AUTO_TRACE(logger_); + if (kThreadStateRunning != thread_state_) { + LOG4CXX_WARN(logger_, + "Thread " << name_ + << ": delegate is not joinable thread_state_ is: " + << thread_state_); + return; + } -void DeleteThread(Thread* thread) { - delete thread; + if (!pthread_equal(pthread_self(), handle_)) { + LOG4CXX_DEBUG(logger_, + "Waiting for #" << handle_ + << " finished iteration in thread #" + << pthread_self()); + state_cond_.Wait(auto_lock); + } } } // namespace threads diff --git a/src/components/utils/src/timer.cc b/src/components/utils/src/timer.cc index 582458e5c4..b58ddf2d66 100644 --- a/src/components/utils/src/timer.cc +++ b/src/components/utils/src/timer.cc @@ -134,7 +134,7 @@ void timer::Timer::StartThread() { DCHECK_OR_RETURN_VOID(thread_); if (!thread_->IsCurrentThread()) { - thread_->start(); + thread_->Start(); } } @@ -148,7 +148,7 @@ void timer::Timer::StopThread() { delegate_->set_finalized_flag(true); { sync_primitives::AutoUnlock auto_unlock(state_lock_); - thread_->join(); + thread_->Stop(threads::Thread::kThreadStopDelegate); } delegate_->set_finalized_flag(false); } diff --git a/src/components/utils/test/CMakeLists.txt b/src/components/utils/test/CMakeLists.txt index b5127ccabc..2db7d72d84 100644 --- a/src/components/utils/test/CMakeLists.txt +++ b/src/components/utils/test/CMakeLists.txt @@ -78,7 +78,6 @@ endif() # exclude some tests list(APPEND EXCLUDE_PATHS generated_code_with_sqlite_test.cc - posix_thread_test.cc resource_usage_test.cc ) diff --git a/src/components/utils/test/posix_thread_test.cc b/src/components/utils/test/posix_thread_test.cc index 4bf0c8c092..2ba4e764f6 100644 --- a/src/components/utils/test/posix_thread_test.cc +++ b/src/components/utils/test/posix_thread_test.cc @@ -34,296 +34,354 @@ #include "utils/lock.h" #include "utils/threads/thread.h" -namespace test { -namespace components { -namespace utils_test { +namespace threads { using namespace sync_primitives; -using namespace threads; - -// TODO(AByzhynar): Change this to use Gtest class to create all variables for -// every TEST_F -// TODO(AByzhynar): Add multithreading tests namespace { -const uint32_t MAX_SIZE = 20; -const size_t MyStackSize = 32768; -const char* threadName("test thread"); -const std::string test_thread_name("THREAD"); -sync_primitives::ConditionalVariable cond_var_; -sync_primitives::Lock test_mutex_; +const uint32_t kMaxSize = 20; +const size_t kStackSize = 32768; +const char* kThreadName = "test thread"; +const std::string kRenamedThreadName("THREAD"); }; // namespace // ThreadDelegate successor -class TestThreadDelegate : public threads::ThreadDelegate { +class TestThreadDelegate : public ThreadDelegate { public: - TestThreadDelegate() : check_value_(false) {} - void threadMain() { + TestThreadDelegate(const bool idle = false) + : idle_(idle), check_value_(false) {} + void threadMain() override { AutoLock test_lock(test_mutex_); check_value_ = true; cond_var_.NotifyOne(); + if (idle_) { + cond_var_.WaitFor(test_lock, 5000); + cond_var_.NotifyOne(); + } + } + + void exitThreadMain() override { + NotifyOne(); } - bool check_value() const { + bool IsChangedValue() const { return check_value_; } + bool WaitFor(AutoLock& auto_lock, const uint32_t& milliseconds) { + return cond_var_.WaitFor(auto_lock, milliseconds); + } + + void NotifyOne() { + cond_var_.NotifyOne(); + } + + Lock& GetLock() { + return test_mutex_; + } + private: + bool idle_; bool check_value_; + sync_primitives::Lock test_mutex_; + sync_primitives::ConditionalVariable cond_var_; }; -TEST(PosixThreadTest, CreateThread_ExpectThreadCreated) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - EXPECT_TRUE(thread != NULL); - EXPECT_EQ(thread, threadDelegate->thread()); - EXPECT_EQ(thread->delegate(), threadDelegate); - DeleteThread(thread); - delete threadDelegate; +class PosixThreadTest : public ::testing::Test { + public: + PosixThreadTest() : thread_delegate_(new TestThreadDelegate()) { + thread_ = CreateThread(kThreadName, thread_delegate_); + } + + ~PosixThreadTest() { + thread_->Stop(Thread::kThreadForceStop); + delete thread_delegate_; + DeleteThread(thread_); + }; + + void ReplaceThreadDelegate(TestThreadDelegate* thread_delegate) { + EXPECT_TRUE(nullptr != thread_); + delete thread_->GetDelegate(); + EXPECT_EQ(nullptr, thread_->GetDelegate()); + + thread_->SetDelegate(thread_delegate); + EXPECT_EQ(thread_delegate, thread_->GetDelegate()); + } + + protected: + TestThreadDelegate* thread_delegate_; + Thread* thread_; +}; + +TEST_F(PosixThreadTest, CreateThread_ExpectThreadCreated) { + EXPECT_TRUE(NULL != thread_); + EXPECT_EQ(thread_, thread_delegate_->thread()); + EXPECT_EQ(thread_delegate_, thread_->GetDelegate()); + + delete thread_delegate_; + thread_delegate_ = nullptr; // Check Delegate Dtor worked successfully - EXPECT_EQ(NULL, thread->delegate()); + EXPECT_EQ(NULL, thread_->GetDelegate()); } -TEST(PosixThreadTest, CheckCreatedThreadName_ExpectCorrectName) { - // Arrange - threads::Thread* thread = NULL; - threads::ThreadDelegate* threadDelegate = new TestThreadDelegate(); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); +TEST_F(PosixThreadTest, CheckCreatedThreadName_ExpectCorrectName) { + EXPECT_TRUE(NULL != thread_); + EXPECT_EQ(thread_, thread_delegate_->thread()); + EXPECT_EQ(thread_delegate_, thread_->GetDelegate()); // Check thread was created with correct name - EXPECT_EQ(threadName, thread->name()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); + EXPECT_EQ(kThreadName, thread_->GetThreadName()); } -TEST(PosixThreadTest, - CheckCreatedThreadNameChangeToLongName_ExpectThreadNameReduced) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize)); - // Rename started thread. Name will be cut to 15 symbols + '\0' - // This is the limit in current POSIX thread implementation - thread->SetNameForId(thread->thread_handle(), - std::string("new thread with changed name")); - // Name must be large enough to keep 16 symbols. Read previous comment - char name[MAX_SIZE]; - int result = pthread_getname_np(thread->thread_handle(), name, sizeof(name)); - if (!result) - EXPECT_EQ(std::string("new thread with"), std::string(name)); - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); +TEST_F(PosixThreadTest, + CheckCreatedThreadNameChangeToLongName_ExpectThreadNameReduced) { + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + thread_->Start(ThreadOptions(Thread::kMinStackSize)); + // Rename started thread. Name will be cut to 15 symbols + '\0' + // This is the limit in current POSIX thread implementation + thread_->SetNameForId(thread_->ThreadHandle(), + std::string("new thread with changed name")); + // Name must be large enough to keep 16 symbols. Read previous comment + char name[kMaxSize]; + int result = + pthread_getname_np(thread_->ThreadHandle(), name, sizeof(name)); + if (!result) { + EXPECT_EQ(std::string("new thread with"), std::string(name)); + } + + thread_delegate_->WaitFor(test_lock, 10000); + } + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST( +TEST_F( PosixThreadTest, - StartCreatedThreadWithOptionsJoinableAndMyStackSize_ExpectMyStackSizeStackAndJoinableThreadStarted) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - // Start thread with following options (Stack size = 32768 & thread is - // joinable) - thread->start(threads::ThreadOptions(MyStackSize)); - // Check thread is joinable - EXPECT_TRUE(thread->is_joinable()); - // Check thread stack size is 32768 - EXPECT_EQ(MyStackSize, thread->stack_size()); - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); + StartCreatedThreadWithOptionsJoinableAndUserStackSize_ExpectUserStackSizeStackAndJoinableThreadStarted) { + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + // Start thread with following options (Stack size = 32768 & thread is + // joinable) + thread_->Start(ThreadOptions(kStackSize)); + // Check thread is joinable + EXPECT_TRUE(thread_->IsJoinable()); + // Check thread stack size is 32768 + EXPECT_EQ(kStackSize, thread_->StackSize()); + thread_delegate_->WaitFor(test_lock, 10000); + } + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST( +TEST_F( PosixThreadTest, StartCreatedThreadWithDefaultOptions_ExpectZeroStackAndJoinableThreadStarted) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - // Start thread with default options (Stack size = 0 & thread is joinable) - thread->start(threads::ThreadOptions()); - // Check thread is joinable - EXPECT_TRUE(thread->is_joinable()); - // Check thread stack size is minimum value. Stack can not be 0 - EXPECT_EQ(Thread::kMinStackSize, thread->stack_size()); - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + // Start thread with default options (Stack size = 0 & thread is joinable) + thread_->Start(ThreadOptions()); + // Check thread is joinable + EXPECT_TRUE(thread_->IsJoinable()); + // Check thread stack size is minimum value. Stack can not be 0 + EXPECT_EQ(Thread::kMinStackSize, thread_->StackSize()); + thread_delegate_->WaitFor(test_lock, 10000); + } + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST( +TEST_F( PosixThreadTest, StartThreadWithZeroStackAndDetached_ExpectMinimumStackAndDetachedThreadStarted) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - // Start thread with default options (Stack size = 0 & thread is detached) - thread->start(threads::ThreadOptions(0, false)); - // Check thread is detached - EXPECT_FALSE(thread->is_joinable()); - // Check thread stack size is 0 - EXPECT_EQ(Thread::kMinStackSize, thread->stack_size()); - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + // Start thread with default options (Stack size = 0 & thread is detached) + thread_->Start(ThreadOptions(0, false)); + // Check thread is detached + EXPECT_FALSE(thread_->IsJoinable()); + // Check thread stack size is 0 + EXPECT_EQ(Thread::kMinStackSize, thread_->StackSize()); + thread_delegate_->WaitFor(test_lock, 10000); + } + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST( - PosixThreadTest, - DISABLED_CheckCreatedThreadNameChangeToEmpty_ExpectThreadNameChangedToEmpty) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize)); - // Rename started thread. Name will be cut to 15 symbols + '\0' - // This is the limit in current POSIX thread implementation - thread->SetNameForId(thread->thread_handle(), std::string("")); - // Name must be large enough to keep 16 symbols. Read previous comment - char name[MAX_SIZE]; - int result = pthread_getname_np(thread->thread_handle(), name, sizeof(name)); - if (!result) { - EXPECT_EQ(std::string(""), std::string(name)); +TEST_F(PosixThreadTest, + CheckCreatedThreadNameChangeToEmpty_ExpectThreadNameChangedToEmpty) { + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + thread_->Start(ThreadOptions(Thread::kMinStackSize)); + // Rename started thread. Name will be cut to 15 symbols + '\0' + // This is the limit in current POSIX thread implementation + thread_->SetNameForId(thread_->ThreadHandle(), std::string("")); + // Name must be large enough to keep 16 symbols. Read previous comment + char name[kMaxSize]; + int result = + pthread_getname_np(thread_->ThreadHandle(), name, sizeof(name)); + if (!result) { + EXPECT_TRUE(std::string(name).empty()); + } + thread_delegate_->WaitFor(test_lock, 10000); } - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST(PosixThreadTest, - CheckCreatedThreadNameChangeToShortName_ExpectThreadNameChangedToShort) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - // Start created thread - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize)); - // Rename started thread. Name will be cut to 15 symbols + '\0' - // This is the limit in current POSIX thread implementation - thread->SetNameForId(thread->thread_handle(), test_thread_name); - // Name must be large enough to keep 16 symbols. Read previous comment - char name[MAX_SIZE]; - int result = pthread_getname_np(thread->thread_handle(), name, sizeof(name)); - if (!result) { - EXPECT_EQ(test_thread_name, std::string(name)); +TEST_F(PosixThreadTest, + CheckCreatedThreadNameChangeToShortName_ExpectThreadNameChangedToShort) { + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + // Start created thread + thread_->Start(ThreadOptions(Thread::kMinStackSize)); + // Rename started thread. Name will be cut to 15 symbols + '\0' + // This is the limit in current POSIX thread implementation + thread_->SetNameForId(thread_->ThreadHandle(), kRenamedThreadName); + // Name must be large enough to keep 16 symbols. Read previous comment + char name[kMaxSize]; + int result = + pthread_getname_np(thread_->ThreadHandle(), name, sizeof(name)); + if (!result) { + EXPECT_EQ(kRenamedThreadName, std::string(name)); + } + thread_delegate_->WaitFor(test_lock, 10000); } - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST(PosixThreadTest, StartThread_ExpectThreadStarted) { - // Arrange - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - // Start created thread - EXPECT_TRUE( - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize))); - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); +TEST_F(PosixThreadTest, StartThread_ExpectThreadStarted) { + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + // Start created thread + EXPECT_TRUE(thread_->Start(ThreadOptions(Thread::kMinStackSize))); + thread_delegate_->WaitFor(test_lock, 10000); + } + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST(PosixThreadTest, StartOneThreadTwice_ExpectTheSameThreadStartedTwice) { - // Arrange - PlatformThreadHandle thread1_id; - PlatformThreadHandle thread2_id; - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - // Start created thread - EXPECT_TRUE( - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize))); - thread1_id = thread->CurrentId(); - thread->stop(); - // Try to start thread again - EXPECT_TRUE( - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize))); - thread2_id = thread->CurrentId(); - EXPECT_EQ(thread1_id, thread2_id); - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); +TEST_F(PosixThreadTest, StartOneThreadTwice_ExpectTheSameThreadStartedTwice) { + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + // Start created thread + EXPECT_TRUE(thread_->Start(ThreadOptions(Thread::kMinStackSize))); + auto thread1_id = thread_->CurrentId(); + thread_delegate_->WaitFor(test_lock, 10000); + thread_->Join(Thread::kThreadJoinDelegate); + // Try to start thread again + EXPECT_TRUE(thread_->Start(ThreadOptions(Thread::kMinStackSize))); + auto thread2_id = thread_->CurrentId(); + EXPECT_EQ(thread1_id, thread2_id); + thread_delegate_->WaitFor(test_lock, 10000); + } + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); } -TEST(PosixThreadTest, - StartOneThreadAgainAfterRename_ExpectRenamedThreadStarted) { - // Arrange - PlatformThreadHandle thread1_id; - PlatformThreadHandle thread2_id; - threads::Thread* thread = NULL; - TestThreadDelegate* threadDelegate = new TestThreadDelegate(); - AutoLock test_lock(test_mutex_); - // Create thread - ASSERT_NO_THROW(thread = CreateThread(threadName, threadDelegate)); - // Start created thread - EXPECT_TRUE( - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize))); - thread1_id = thread->CurrentId(); - // Rename started thread. Name will be cut to 15 symbols + '\0' - // This is the limit in current POSIX thread implementation - thread->SetNameForId(thread->thread_handle(), test_thread_name); - // Name must be large enough to keep 16 symbols. Read previous comment - char name[MAX_SIZE]; - int result = pthread_getname_np(thread->thread_handle(), name, sizeof(name)); - if (!result) - EXPECT_EQ(test_thread_name, std::string(name)); - // Stop thread - thread->stop(); - EXPECT_TRUE( - thread->start(threads::ThreadOptions(threads::Thread::kMinStackSize))); - thread2_id = thread->CurrentId(); - // Expect the same thread started with the the same name - EXPECT_EQ(test_thread_name, std::string(name)); - EXPECT_EQ(thread1_id, thread2_id); - cond_var_.WaitFor(test_lock, 10000); - EXPECT_TRUE(threadDelegate->check_value()); - DeleteThread(thread); - delete threadDelegate; - EXPECT_EQ(NULL, thread->delegate()); +TEST_F(PosixThreadTest, + StartOneThreadAgainAfterRename_ExpectRenamedThreadStarted) { + { + auto& lock = thread_delegate_->GetLock(); + AutoLock test_lock(lock); + // Start created thread + EXPECT_TRUE(thread_->Start(ThreadOptions(Thread::kMinStackSize))); + auto thread1_id = thread_->CurrentId(); + // Rename started thread. Name will be cut to 15 symbols + '\0' + // This is the limit in current POSIX thread implementation + thread_->SetNameForId(thread_->ThreadHandle(), kThreadName); + // Name must be large enough to keep 16 symbols. Read previous comment + char name[kMaxSize]; + int result = + pthread_getname_np(thread_->ThreadHandle(), name, sizeof(name)); + if (!result) { + EXPECT_EQ(kThreadName, std::string(name)); + } + thread_delegate_->WaitFor(test_lock, 10000); + thread_->Join(Thread::kThreadJoinDelegate); + EXPECT_TRUE(thread_->Start(ThreadOptions(Thread::kMinStackSize))); + auto thread2_id = thread_->CurrentId(); + // Expect the same thread started with the the same name + EXPECT_EQ(kThreadName, std::string(name)); + EXPECT_EQ(thread1_id, thread2_id); + thread_delegate_->WaitFor(test_lock, 10000); + } + + EXPECT_TRUE(thread_delegate_->IsChangedValue()); +} + +TEST_F(PosixThreadTest, StartThreadWithNullPtrDelegate_ExpectThreadStateNone) { + thread_->SetDelegate(nullptr); + EXPECT_TRUE(nullptr != thread_); + EXPECT_EQ(nullptr, thread_->GetDelegate()); + + EXPECT_FALSE(thread_->Start()); + EXPECT_FALSE(thread_->IsRunning()); + EXPECT_EQ(Thread::kThreadStateNone, thread_->thread_state_); +} + +TEST_F(PosixThreadTest, + StartThreadExecutingThreadMain_ExpectThreadStateRunning) { + const bool cycled_thread_main = true; + thread_delegate_ = new TestThreadDelegate(cycled_thread_main); + ReplaceThreadDelegate(thread_delegate_); + + { + auto& lock = thread_delegate_->GetLock(); + AutoLock auto_lock(lock); + EXPECT_TRUE(thread_->Start()); + thread_delegate_->WaitFor(auto_lock, 5000); + } + + EXPECT_TRUE(thread_->IsRunning()); + EXPECT_EQ(Thread::kThreadStateRunning, thread_->thread_state_); +} + +TEST_F(PosixThreadTest, + StartThreadExecutingThreadMainCallStopDelegate_ExpectThreadStateIdle) { + const bool cycled_thread_main = true; + thread_delegate_ = new TestThreadDelegate(cycled_thread_main); + ReplaceThreadDelegate(thread_delegate_); + + { + auto& lock = thread_delegate_->GetLock(); + AutoLock auto_lock(lock); + EXPECT_TRUE(thread_->Start()); + thread_delegate_->WaitFor(auto_lock, 5000); + } + + thread_->Stop(Thread::kThreadStopDelegate); + + EXPECT_FALSE(thread_->IsRunning()); + EXPECT_EQ(Thread::kThreadStateIdle, thread_->thread_state_); +} + +TEST_F(PosixThreadTest, + StartThreadExecutingThreadMainCallForceStop_ExpectThreadStateCompleted) { + const bool cycled_thread_main = true; + thread_delegate_ = new TestThreadDelegate(cycled_thread_main); + ReplaceThreadDelegate(thread_delegate_); + + { + auto& lock = thread_delegate_->GetLock(); + AutoLock auto_lock(lock); + EXPECT_TRUE(thread_->Start()); + thread_delegate_->WaitFor(auto_lock, 5000); + } + + thread_->Stop(Thread::kThreadForceStop); + thread_->Join(Thread::kThreadJoinThread); + + EXPECT_FALSE(thread_->IsRunning()); + EXPECT_EQ(Thread::kThreadStateCompleted, thread_->thread_state_); } -} // namespace utils_test -} // namespace components -} // namespace test +} // namespace threads |