diff options
Diffstat (limited to 'src/components/utils/src/threads/posix_thread.cc')
-rw-r--r-- | src/components/utils/src/threads/posix_thread.cc | 251 |
1 files changed, 153 insertions, 98 deletions
diff --git a/src/components/utils/src/threads/posix_thread.cc b/src/components/utils/src/threads/posix_thread.cc index 3f7e006eca..76cc5800e0 100644 --- a/src/components/utils/src/threads/posix_thread.cc +++ b/src/components/utils/src/threads/posix_thread.cc @@ -30,19 +30,21 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "utils/threads/thread.h" + #include <errno.h> #include <limits.h> #include <stddef.h> #include <signal.h> +#include <unistd.h> #include "utils/atomic.h" -#include "utils/threads/thread.h" +#include "utils/threads/thread_delegate.h" #include "utils/threads/thread_manager.h" #include "utils/logger.h" #include "pthread.h" - #ifndef __QNXNTO__ const int EOK = 0; #endif @@ -55,50 +57,75 @@ namespace threads { CREATE_LOGGERPTR_GLOBAL(logger_, "Utils") -namespace { - -static void* threadFunc(void* arg) { - LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " started successfully"); - threads::Thread* thread = static_cast<threads::Thread*>(arg); - threads::ThreadDelegate* delegate = thread->delegate(); - delegate->threadMain(); - thread->set_running(false); - MessageQueue<ThreadManager::ThreadDesc>& threads = ::threads::ThreadManager::instance()->threads_to_terminate; - if (!threads.IsShuttingDown()) { - LOG4CXX_INFO(logger_, "Pushing thread #" << pthread_self() << " to join queue"); - ThreadManager::ThreadDesc desc = { pthread_self(), delegate }; - threads.push(desc); - } - LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " exited successfully"); - return NULL; -} +size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */ +void Thread::cleanup(void* arg) { + LOG4CXX_AUTO_TRACE(logger_); + Thread* thread = reinterpret_cast<Thread*>(arg); + sync_primitives::AutoLock auto_lock(thread->state_lock_); + thread->isThreadRunning_ = false; + thread->state_cond_.Broadcast(); } -size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */ - -/* -void ThreadBase::set_name(const std::string name) { - std::string trimname = name.erase(15); - if(pthread_setname_np(thread_handle_, trimname.c_str()) != EOK) { - LOG4CXX_WARN(logger_, "Couldn't set pthread name \"" - << trimname - << "\", error code " - << pthread_result - << " (" - << strerror(pthread_result) - << ")"); +void* Thread::threadFunc(void* arg) { + // 0 - state_lock unlocked + // stopped = 0 + // running = 0 + // finalized = 0 + // 4 - state_lock unlocked + // stopped = 1 + // running = 1 + // finalized = 0 + // 5 - state_lock unlocked + // stopped = 1 + // running = 1 + // finalized = 1 + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " started successfully"); + + threads::Thread* thread = reinterpret_cast<Thread*>(arg); + DCHECK(thread); + + pthread_cleanup_push(&cleanup, thread); + + thread->state_lock_.Acquire(); + thread->state_cond_.Broadcast(); + + while (!thread->finalized_) { + LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " iteration"); + thread->run_cond_.Wait(thread->state_lock_); + LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " execute. " + << "stopped_ = " << thread->stopped_ << "; finalized_ = " << thread->finalized_); + if (!thread->stopped_ && !thread->finalized_) { + thread->isThreadRunning_ = true; + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_testcancel(); + + thread->state_lock_.Release(); + thread->delegate_->threadMain(); + thread->state_lock_.Acquire(); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + thread->isThreadRunning_ = false; + } + thread->state_cond_.Broadcast(); + LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " finished iteration"); } + + thread->state_lock_.Release(); + pthread_cleanup_pop(1); + + LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " exited successfully"); + return NULL; } -*/ -void Thread::SetNameForId(const Id& thread_id, const std::string& name) { - std::string nm = name; - std::string& trimname = nm.size() > 15 ? nm.erase(15) : nm; - const int rc = pthread_setname_np(thread_id.id_, trimname.c_str()); - if(rc == EOK) { +void Thread::SetNameForId(const PlatformThreadHandle& thread_id, std::string name) { + if (name.size() > THREAD_NAME_SIZE) + name.erase(THREAD_NAME_SIZE); + const int rc = pthread_setname_np(thread_id, name.c_str()); + if(rc != EOK) { LOG4CXX_WARN(logger_, "Couldn't set pthread name \"" - << trimname + << name << "\", error code " << rc << " (" @@ -110,46 +137,61 @@ void Thread::SetNameForId(const Id& thread_id, const std::string& name) { Thread::Thread(const char* name, ThreadDelegate* delegate) : name_(name ? name : "undefined"), delegate_(delegate), - thread_handle_(0), + handle_(0), thread_options_(), - isThreadRunning_(0) { -} + isThreadRunning_(0), + stopped_(false), + finalized_(false), + thread_created_(false) { } -ThreadDelegate* Thread::delegate() const { - return delegate_; +bool Thread::start() { + return start(thread_options_); } -bool Thread::start() { - return startWithOptions(thread_options_); +PlatformThreadHandle Thread::CurrentId() { + return pthread_self(); } -Thread::Id Thread::CurrentId() { - return Id(pthread_self()); +void Thread::WaitForRun() { + sync_primitives::AutoLock auto_lock(state_lock_); + run_cond_.Wait(auto_lock); } -bool Thread::startWithOptions(const ThreadOptions& options) { - LOG4CXX_TRACE_ENTER(logger_); +bool Thread::start(const ThreadOptions& options) { + LOG4CXX_AUTO_TRACE(logger_); + + sync_primitives::AutoLock auto_lock(state_lock_); + // 1 - state_lock locked + // stopped = 0 + // running = 0 + if (!delegate_) { - NOTREACHED(); - LOG4CXX_ERROR(logger_, "NULL delegate"); - LOG4CXX_TRACE_EXIT(logger_); + LOG4CXX_ERROR(logger_, "Cannot start thread" << name_ + << ": delegate is NULL"); + // 0 - state_lock unlocked return false; } + if (isThreadRunning_) { + LOG4CXX_TRACE(logger_, "EXIT thread "<< name_ + << " #" << handle_ << "is already runing"); + return true; + } + thread_options_ = options; pthread_attr_t attributes; int pthread_result = pthread_attr_init(&attributes); if (pthread_result != EOK) { LOG4CXX_WARN(logger_,"Couldn't init pthread attributes. Error code = " - << pthread_result << "(\"" << strerror(pthread_result) << "\")"); + << pthread_result << " (\"" << strerror(pthread_result) << "\")"); } if (!thread_options_.is_joinable()) { pthread_result = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED); if (pthread_result != EOK) { - LOG4CXX_WARN(logger_,"Couldn't set detach state attribute.. Error code = " - << pthread_result << "(\"" << strerror(pthread_result) << "\")"); + LOG4CXX_WARN(logger_,"Couldn't set detach state attribute. Error code = " + << pthread_result << " (\"" << strerror(pthread_result) << "\")"); thread_options_.is_joinable(false); } } @@ -159,66 +201,78 @@ bool Thread::startWithOptions(const ThreadOptions& options) { pthread_result = pthread_attr_setstacksize(&attributes, stack_size); if (pthread_result != EOK) { LOG4CXX_WARN(logger_,"Couldn't set stacksize = " << stack_size << - ". Error code = " << pthread_result << "(\"" - << strerror(pthread_result) << "\")"); + ". Error code = " << pthread_result + << " (\"" << strerror(pthread_result) << "\")"); } } - pthread_result = pthread_create(&thread_handle_, &attributes, threadFunc, this); - isThreadRunning_ = (pthread_result == EOK); - if (!isThreadRunning_) { - LOG4CXX_WARN(logger_, "Couldn't create thread. Error code = " - << pthread_result << "(\"" << strerror(pthread_result) << "\")"); - } else { - LOG4CXX_INFO(logger_,"Created thread: " << name_); - SetNameForId(Id(thread_handle_), name_); + if (!thread_created_) { + // state_lock 1 + pthread_result = pthread_create(&handle_, &attributes, threadFunc, this); + if (pthread_result == EOK) { + LOG4CXX_INFO(logger_,"Created thread: " << name_); + SetNameForId(handle_, name_); + // state_lock 0 + // possible concurrencies: stop and threadFunc + state_cond_.Wait(auto_lock); + thread_created_ = true; + } else { + LOG4CXX_ERROR(logger_, "Couldn't create thread " << name_ + << ". Error code = " << pthread_result + << " (\"" << strerror(pthread_result) << "\")"); + } } - LOG4CXX_TRACE_EXIT(logger_); - return isThreadRunning_; + stopped_ = false; + run_cond_.NotifyOne(); + LOG4CXX_DEBUG(logger_,"Thread " << name_ + << " #" << handle_ << " started. pthread_result = " + << pthread_result); + return pthread_result == EOK; } void Thread::stop() { - LOG4CXX_TRACE_ENTER(logger_); + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock auto_lock(state_lock_); - if (!atomic_post_clr(&isThreadRunning_)) - { - return; - } + stopped_ = true; - if (delegate_ && !delegate_->exitThreadMain()) { - if (thread_handle_ != pthread_self()) { - LOG4CXX_WARN(logger_, "Cancelling thread #" << thread_handle_); - const int pthread_result = pthread_cancel(thread_handle_); - if (pthread_result != EOK) { - LOG4CXX_WARN(logger_, - "Couldn't cancel thread (#" << thread_handle_ << " \"" << name_ << - "\") from thread #" << pthread_self() << ". Error code = " - << pthread_result << " (\"" << strerror(pthread_result) << "\")"); - } - } else { - LOG4CXX_ERROR(logger_, - "Couldn't cancel the same thread (#" << thread_handle_ - << "\"" << name_ << "\")"); - } + LOG4CXX_DEBUG(logger_, "Stopping thread #" << handle_ + << " \"" << name_ << " \""); + + if (delegate_ && isThreadRunning_) { + delegate_->exitThreadMain(); } - LOG4CXX_TRACE_EXIT(logger_); + LOG4CXX_DEBUG(logger_, "Stopped thread #" << handle_ + << " \"" << name_ << " \""); } -bool Thread::Id::operator==(const Thread::Id& other) const { - return pthread_equal(id_, other.id_) != 0; -} +void Thread::join() { + LOG4CXX_AUTO_TRACE(logger_); + DCHECK(!pthread_equal(pthread_self(), handle_)); + + stop(); -std::ostream& operator<<(std::ostream& os, const Thread::Id& thread_id) { - char name[32]; - if(pthread_getname_np(thread_id.Handle(), name, 32) == 0) { - os << name; + sync_primitives::AutoLock auto_lock(state_lock_); + run_cond_.NotifyOne(); + if (isThreadRunning_) { + if(!pthread_equal(pthread_self(), handle_)) { + state_cond_.Wait(auto_lock); + } } - return os; +} + +Thread::~Thread() { + finalized_ = true; + stopped_ = true; + join(); + pthread_join(handle_, NULL); } Thread* CreateThread(const char* name, ThreadDelegate* delegate) { - return new Thread(name, delegate); + Thread* thread = new Thread(name, delegate); + delegate->set_thread(thread); + return thread; } void DeleteThread(Thread* thread) { @@ -226,4 +280,5 @@ void DeleteThread(Thread* thread) { } + } // namespace threads |