diff options
Diffstat (limited to 'src/components/utils/src/threads/posix_thread.cc')
-rw-r--r-- | src/components/utils/src/threads/posix_thread.cc | 299 |
1 files changed, 182 insertions, 117 deletions
diff --git a/src/components/utils/src/threads/posix_thread.cc b/src/components/utils/src/threads/posix_thread.cc index 3f7e006eca..82dd8c59b3 100644 --- a/src/components/utils/src/threads/posix_thread.cc +++ b/src/components/utils/src/threads/posix_thread.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014, Ford Motor Company + * Copyright (c) 2015, Ford Motor Company * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,125 +31,168 @@ */ #include <errno.h> - #include <limits.h> #include <stddef.h> #include <signal.h> -#include "utils/atomic.h" +#ifdef BUILD_TESTS +// Temporary fix for UnitTest until APPLINK-9987 is resolved +#include <unistd.h> +#endif + #include "utils/threads/thread.h" -#include "utils/threads/thread_manager.h" -#include "utils/logger.h" #include "pthread.h" - +#include "utils/atomic.h" +#include "utils/threads/thread_delegate.h" +#include "utils/logger.h" #ifndef __QNXNTO__ - const int EOK = 0; +const int EOK = 0; #endif #if defined(OS_POSIX) - const size_t THREAD_NAME_SIZE = 15; +const size_t THREAD_NAME_SIZE = 15; #endif namespace threads { CREATE_LOGGERPTR_GLOBAL(logger_, "Utils") -namespace { - -static void* threadFunc(void* arg) { - LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " started successfully"); - threads::Thread* thread = static_cast<threads::Thread*>(arg); - threads::ThreadDelegate* delegate = thread->delegate(); - delegate->threadMain(); - thread->set_running(false); - MessageQueue<ThreadManager::ThreadDesc>& threads = ::threads::ThreadManager::instance()->threads_to_terminate; - if (!threads.IsShuttingDown()) { - LOG4CXX_INFO(logger_, "Pushing thread #" << pthread_self() << " to join queue"); - ThreadManager::ThreadDesc desc = { pthread_self(), delegate }; - threads.push(desc); - } - LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " exited successfully"); - return NULL; -} +size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */ +void Thread::cleanup(void* arg) { + LOG4CXX_AUTO_TRACE(logger_); + Thread* thread = reinterpret_cast<Thread*>(arg); + sync_primitives::AutoLock auto_lock(thread->state_lock_); + thread->isThreadRunning_ = false; + thread->state_cond_.Broadcast(); } -size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */ +void* Thread::threadFunc(void* arg) { + // 0 - state_lock unlocked + // stopped = 0 + // running = 0 + // finalized = 0 + // 4 - state_lock unlocked + // stopped = 1 + // running = 1 + // finalized = 0 + // 5 - state_lock unlocked + // stopped = 1 + // running = 1 + // finalized = 1 + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + LOG4CXX_DEBUG(logger_, + "Thread #" << pthread_self() << " started successfully"); + + threads::Thread* thread = reinterpret_cast<Thread*>(arg); + DCHECK(thread); + + pthread_cleanup_push(&cleanup, thread); + + thread->state_lock_.Acquire(); + thread->state_cond_.Broadcast(); + + while (!thread->finalized_) { + LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " iteration"); + thread->run_cond_.Wait(thread->state_lock_); + LOG4CXX_DEBUG( + logger_, + "Thread #" << pthread_self() << " execute. " << "stopped_ = " << thread->stopped_ << "; finalized_ = " << thread->finalized_); + if (!thread->stopped_ && !thread->finalized_) { + thread->isThreadRunning_ = true; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_testcancel(); + + thread->state_lock_.Release(); + thread->delegate_->threadMain(); + thread->state_lock_.Acquire(); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + thread->isThreadRunning_ = false; + } + thread->state_cond_.Broadcast(); + LOG4CXX_DEBUG(logger_, + "Thread #" << pthread_self() << " finished iteration"); + } -/* -void ThreadBase::set_name(const std::string name) { - std::string trimname = name.erase(15); - if(pthread_setname_np(thread_handle_, trimname.c_str()) != EOK) { - LOG4CXX_WARN(logger_, "Couldn't set pthread name \"" - << trimname - << "\", error code " - << pthread_result - << " (" - << strerror(pthread_result) - << ")"); - } + thread->state_lock_.Release(); + pthread_cleanup_pop(1); + + LOG4CXX_DEBUG(logger_, + "Thread #" << pthread_self() << " exited successfully"); + return NULL; } -*/ - -void Thread::SetNameForId(const Id& thread_id, const std::string& name) { - std::string nm = name; - std::string& trimname = nm.size() > 15 ? nm.erase(15) : nm; - const int rc = pthread_setname_np(thread_id.id_, trimname.c_str()); - if(rc == EOK) { - LOG4CXX_WARN(logger_, "Couldn't set pthread name \"" - << trimname - << "\", error code " - << rc - << " (" - << strerror(rc) - << ")"); + +void Thread::SetNameForId(const PlatformThreadHandle& thread_id, + std::string name) { + if (name.size() > THREAD_NAME_SIZE) + name.erase(THREAD_NAME_SIZE); + const int rc = pthread_setname_np(thread_id, name.c_str()); + if (rc != EOK) { + LOG4CXX_WARN( + logger_, + "Couldn't set pthread name \"" << name << "\", error code " << rc << " (" << strerror(rc) << ")"); } } Thread::Thread(const char* name, ThreadDelegate* delegate) - : name_(name ? name : "undefined"), - delegate_(delegate), - thread_handle_(0), - thread_options_(), - isThreadRunning_(0) { -} - -ThreadDelegate* Thread::delegate() const { - return delegate_; + : name_(name ? name : "undefined"), + delegate_(delegate), + handle_(0), + thread_options_(), + isThreadRunning_(0), + stopped_(false), + finalized_(false), + thread_created_(false) { } bool Thread::start() { - return startWithOptions(thread_options_); + return start(thread_options_); } -Thread::Id Thread::CurrentId() { - return Id(pthread_self()); +PlatformThreadHandle Thread::CurrentId() { + return pthread_self(); } -bool Thread::startWithOptions(const ThreadOptions& options) { - LOG4CXX_TRACE_ENTER(logger_); +bool Thread::start(const ThreadOptions& options) { + LOG4CXX_AUTO_TRACE(logger_); + + sync_primitives::AutoLock auto_lock(state_lock_); + // 1 - state_lock locked + // stopped = 0 + // running = 0 + if (!delegate_) { - NOTREACHED(); - LOG4CXX_ERROR(logger_, "NULL delegate"); - LOG4CXX_TRACE_EXIT(logger_); + LOG4CXX_ERROR(logger_, + "Cannot start thread " << name_ << ": delegate is NULL"); + // 0 - state_lock unlocked return false; } + if (isThreadRunning_) { + LOG4CXX_TRACE( + logger_, + "EXIT thread "<< name_ << " #" << handle_ << " is already running"); + return true; + } + thread_options_ = options; pthread_attr_t attributes; int pthread_result = pthread_attr_init(&attributes); if (pthread_result != EOK) { - LOG4CXX_WARN(logger_,"Couldn't init pthread attributes. Error code = " - << pthread_result << "(\"" << strerror(pthread_result) << "\")"); + LOG4CXX_WARN( + logger_, + "Couldn't init pthread attributes. Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")"); } if (!thread_options_.is_joinable()) { pthread_result = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED); if (pthread_result != EOK) { - LOG4CXX_WARN(logger_,"Couldn't set detach state attribute.. Error code = " - << pthread_result << "(\"" << strerror(pthread_result) << "\")"); + LOG4CXX_WARN( + logger_, + "Couldn't set detach state attribute. Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")"); thread_options_.is_joinable(false); } } @@ -158,72 +201,94 @@ bool Thread::startWithOptions(const ThreadOptions& options) { if (stack_size >= Thread::kMinStackSize) { pthread_result = pthread_attr_setstacksize(&attributes, stack_size); if (pthread_result != EOK) { - LOG4CXX_WARN(logger_,"Couldn't set stacksize = " << stack_size << - ". Error code = " << pthread_result << "(\"" - << strerror(pthread_result) << "\")"); + LOG4CXX_WARN( + logger_, + "Couldn't set stacksize = " << stack_size << ". Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")"); } } + else { + ThreadOptions thread_options_temp(Thread::kMinStackSize, thread_options_.is_joinable()); + thread_options_ = thread_options_temp; + } - pthread_result = pthread_create(&thread_handle_, &attributes, threadFunc, this); - isThreadRunning_ = (pthread_result == EOK); - if (!isThreadRunning_) { - LOG4CXX_WARN(logger_, "Couldn't create thread. Error code = " - << pthread_result << "(\"" << strerror(pthread_result) << "\")"); - } else { - LOG4CXX_INFO(logger_,"Created thread: " << name_); - SetNameForId(Id(thread_handle_), name_); + if (!thread_created_) { + // state_lock 1 + pthread_result = pthread_create(&handle_, &attributes, threadFunc, this); + if (pthread_result == EOK) { + LOG4CXX_DEBUG(logger_, "Created thread: " << name_); + SetNameForId(handle_, name_); + // state_lock 0 + // possible concurrencies: stop and threadFunc + state_cond_.Wait(auto_lock); + thread_created_ = true; + } else { + LOG4CXX_ERROR( + logger_, + "Couldn't create thread " << name_ << ". Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")"); + } } - LOG4CXX_TRACE_EXIT(logger_); - return isThreadRunning_; + stopped_ = false; + run_cond_.NotifyOne(); + LOG4CXX_DEBUG( + logger_, + "Thread " << name_ << " #" << handle_ << " started. pthread_result = " << pthread_result); + pthread_attr_destroy(&attributes); + return pthread_result == EOK; } void Thread::stop() { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock auto_lock(state_lock_); - if (!atomic_post_clr(&isThreadRunning_)) - { - return; - } +#ifdef BUILD_TESTS + // Temporary fix for UnitTest until APPLINK-9987 is resolved + usleep(100000); +#endif - if (delegate_ && !delegate_->exitThreadMain()) { - if (thread_handle_ != pthread_self()) { - LOG4CXX_WARN(logger_, "Cancelling thread #" << thread_handle_); - const int pthread_result = pthread_cancel(thread_handle_); - if (pthread_result != EOK) { - LOG4CXX_WARN(logger_, - "Couldn't cancel thread (#" << thread_handle_ << " \"" << name_ << - "\") from thread #" << pthread_self() << ". Error code = " - << pthread_result << " (\"" << strerror(pthread_result) << "\")"); - } - } else { - LOG4CXX_ERROR(logger_, - "Couldn't cancel the same thread (#" << thread_handle_ - << "\"" << name_ << "\")"); - } + stopped_ = true; + + LOG4CXX_DEBUG(logger_, "Stopping thread #" << handle_ + + << " \"" << name_ << " \""); + + if (delegate_ && isThreadRunning_) { + delegate_->exitThreadMain(); } - LOG4CXX_TRACE_EXIT(logger_); + LOG4CXX_DEBUG(logger_, + "Stopped thread #" << handle_ << " \"" << name_ << " \""); } -bool Thread::Id::operator==(const Thread::Id& other) const { - return pthread_equal(id_, other.id_) != 0; -} +void Thread::join() { + LOG4CXX_AUTO_TRACE(logger_); + DCHECK(!pthread_equal(pthread_self(), handle_)); -std::ostream& operator<<(std::ostream& os, const Thread::Id& thread_id) { - char name[32]; - if(pthread_getname_np(thread_id.Handle(), name, 32) == 0) { - os << name; + stop(); + + sync_primitives::AutoLock auto_lock(state_lock_); + run_cond_.NotifyOne(); + if (isThreadRunning_) { + if (!pthread_equal(pthread_self(), handle_)) { + state_cond_.Wait(auto_lock); + } } - return os; +} + +Thread::~Thread() { + finalized_ = true; + stopped_ = true; + join(); + pthread_join(handle_, NULL); } Thread* CreateThread(const char* name, ThreadDelegate* delegate) { - return new Thread(name, delegate); + Thread* thread = new Thread(name, delegate); + delegate->set_thread(thread); + return thread; } void DeleteThread(Thread* thread) { delete thread; } - } // namespace threads |