summaryrefslogtreecommitdiff
path: root/src/components/utils/src/threads/posix_thread.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/utils/src/threads/posix_thread.cc')
-rw-r--r--src/components/utils/src/threads/posix_thread.cc293
1 files changed, 176 insertions, 117 deletions
diff --git a/src/components/utils/src/threads/posix_thread.cc b/src/components/utils/src/threads/posix_thread.cc
index 3f7e006ec..3972a40eb 100644
--- a/src/components/utils/src/threads/posix_thread.cc
+++ b/src/components/utils/src/threads/posix_thread.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014, Ford Motor Company
+ * Copyright (c) 2015, Ford Motor Company
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,125 +31,168 @@
*/
#include <errno.h>
-
#include <limits.h>
#include <stddef.h>
#include <signal.h>
-#include "utils/atomic.h"
+#ifdef BUILD_TESTS
+// Temporary fix for UnitTest until APPLINK-9987 is resolved
+#include <unistd.h>
+#endif
+
#include "utils/threads/thread.h"
-#include "utils/threads/thread_manager.h"
-#include "utils/logger.h"
#include "pthread.h"
-
+#include "utils/atomic.h"
+#include "utils/threads/thread_delegate.h"
+#include "utils/logger.h"
#ifndef __QNXNTO__
- const int EOK = 0;
+const int EOK = 0;
#endif
#if defined(OS_POSIX)
- const size_t THREAD_NAME_SIZE = 15;
+const size_t THREAD_NAME_SIZE = 15;
#endif
namespace threads {
CREATE_LOGGERPTR_GLOBAL(logger_, "Utils")
-namespace {
-
-static void* threadFunc(void* arg) {
- LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " started successfully");
- threads::Thread* thread = static_cast<threads::Thread*>(arg);
- threads::ThreadDelegate* delegate = thread->delegate();
- delegate->threadMain();
- thread->set_running(false);
- MessageQueue<ThreadManager::ThreadDesc>& threads = ::threads::ThreadManager::instance()->threads_to_terminate;
- if (!threads.IsShuttingDown()) {
- LOG4CXX_INFO(logger_, "Pushing thread #" << pthread_self() << " to join queue");
- ThreadManager::ThreadDesc desc = { pthread_self(), delegate };
- threads.push(desc);
- }
- LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " exited successfully");
- return NULL;
-}
+size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */
+void Thread::cleanup(void* arg) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ Thread* thread = reinterpret_cast<Thread*>(arg);
+ sync_primitives::AutoLock auto_lock(thread->state_lock_);
+ thread->isThreadRunning_ = false;
+ thread->state_cond_.Broadcast();
}
-size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */
+void* Thread::threadFunc(void* arg) {
+ // 0 - state_lock unlocked
+ // stopped = 0
+ // running = 0
+ // finalized = 0
+ // 4 - state_lock unlocked
+ // stopped = 1
+ // running = 1
+ // finalized = 0
+ // 5 - state_lock unlocked
+ // stopped = 1
+ // running = 1
+ // finalized = 1
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ LOG4CXX_DEBUG(logger_,
+ "Thread #" << pthread_self() << " started successfully");
+
+ threads::Thread* thread = reinterpret_cast<Thread*>(arg);
+ DCHECK(thread);
+
+ pthread_cleanup_push(&cleanup, thread);
+
+ thread->state_lock_.Acquire();
+ thread->state_cond_.Broadcast();
+
+ while (!thread->finalized_) {
+ LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " iteration");
+ thread->run_cond_.Wait(thread->state_lock_);
+ LOG4CXX_DEBUG(
+ logger_,
+ "Thread #" << pthread_self() << " execute. " << "stopped_ = " << thread->stopped_ << "; finalized_ = " << thread->finalized_);
+ if (!thread->stopped_ && !thread->finalized_) {
+ thread->isThreadRunning_ = true;
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_testcancel();
+
+ thread->state_lock_.Release();
+ thread->delegate_->threadMain();
+ thread->state_lock_.Acquire();
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ thread->isThreadRunning_ = false;
+ }
+ thread->state_cond_.Broadcast();
+ LOG4CXX_DEBUG(logger_,
+ "Thread #" << pthread_self() << " finished iteration");
+ }
-/*
-void ThreadBase::set_name(const std::string name) {
- std::string trimname = name.erase(15);
- if(pthread_setname_np(thread_handle_, trimname.c_str()) != EOK) {
- LOG4CXX_WARN(logger_, "Couldn't set pthread name \""
- << trimname
- << "\", error code "
- << pthread_result
- << " ("
- << strerror(pthread_result)
- << ")");
- }
+ thread->state_lock_.Release();
+ pthread_cleanup_pop(1);
+
+ LOG4CXX_DEBUG(logger_,
+ "Thread #" << pthread_self() << " exited successfully");
+ return NULL;
}
-*/
-
-void Thread::SetNameForId(const Id& thread_id, const std::string& name) {
- std::string nm = name;
- std::string& trimname = nm.size() > 15 ? nm.erase(15) : nm;
- const int rc = pthread_setname_np(thread_id.id_, trimname.c_str());
- if(rc == EOK) {
- LOG4CXX_WARN(logger_, "Couldn't set pthread name \""
- << trimname
- << "\", error code "
- << rc
- << " ("
- << strerror(rc)
- << ")");
+
+void Thread::SetNameForId(const PlatformThreadHandle& thread_id,
+ std::string name) {
+ if (name.size() > THREAD_NAME_SIZE)
+ name.erase(THREAD_NAME_SIZE);
+ const int rc = pthread_setname_np(thread_id, name.c_str());
+ if (rc != EOK) {
+ LOG4CXX_WARN(
+ logger_,
+ "Couldn't set pthread name \"" << name << "\", error code " << rc << " (" << strerror(rc) << ")");
}
}
Thread::Thread(const char* name, ThreadDelegate* delegate)
- : name_(name ? name : "undefined"),
- delegate_(delegate),
- thread_handle_(0),
- thread_options_(),
- isThreadRunning_(0) {
-}
-
-ThreadDelegate* Thread::delegate() const {
- return delegate_;
+ : name_(name ? name : "undefined"),
+ delegate_(delegate),
+ handle_(0),
+ thread_options_(),
+ isThreadRunning_(0),
+ stopped_(false),
+ finalized_(false),
+ thread_created_(false) {
}
bool Thread::start() {
- return startWithOptions(thread_options_);
+ return start(thread_options_);
}
-Thread::Id Thread::CurrentId() {
- return Id(pthread_self());
+PlatformThreadHandle Thread::CurrentId() {
+ return pthread_self();
}
-bool Thread::startWithOptions(const ThreadOptions& options) {
- LOG4CXX_TRACE_ENTER(logger_);
+bool Thread::start(const ThreadOptions& options) {
+ LOG4CXX_AUTO_TRACE(logger_);
+
+ sync_primitives::AutoLock auto_lock(state_lock_);
+ // 1 - state_lock locked
+ // stopped = 0
+ // running = 0
+
if (!delegate_) {
- NOTREACHED();
- LOG4CXX_ERROR(logger_, "NULL delegate");
- LOG4CXX_TRACE_EXIT(logger_);
+ LOG4CXX_ERROR(logger_,
+ "Cannot start thread " << name_ << ": delegate is NULL");
+ // 0 - state_lock unlocked
return false;
}
+ if (isThreadRunning_) {
+ LOG4CXX_TRACE(
+ logger_,
+ "EXIT thread "<< name_ << " #" << handle_ << " is already running");
+ return true;
+ }
+
thread_options_ = options;
pthread_attr_t attributes;
int pthread_result = pthread_attr_init(&attributes);
if (pthread_result != EOK) {
- LOG4CXX_WARN(logger_,"Couldn't init pthread attributes. Error code = "
- << pthread_result << "(\"" << strerror(pthread_result) << "\")");
+ LOG4CXX_WARN(
+ logger_,
+ "Couldn't init pthread attributes. Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")");
}
if (!thread_options_.is_joinable()) {
pthread_result = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED);
if (pthread_result != EOK) {
- LOG4CXX_WARN(logger_,"Couldn't set detach state attribute.. Error code = "
- << pthread_result << "(\"" << strerror(pthread_result) << "\")");
+ LOG4CXX_WARN(
+ logger_,
+ "Couldn't set detach state attribute. Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")");
thread_options_.is_joinable(false);
}
}
@@ -158,72 +201,88 @@ bool Thread::startWithOptions(const ThreadOptions& options) {
if (stack_size >= Thread::kMinStackSize) {
pthread_result = pthread_attr_setstacksize(&attributes, stack_size);
if (pthread_result != EOK) {
- LOG4CXX_WARN(logger_,"Couldn't set stacksize = " << stack_size <<
- ". Error code = " << pthread_result << "(\""
- << strerror(pthread_result) << "\")");
+ LOG4CXX_WARN(
+ logger_,
+ "Couldn't set stacksize = " << stack_size << ". Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")");
}
}
+ else {
+ ThreadOptions thread_options_temp(Thread::kMinStackSize, thread_options_.is_joinable());
+ thread_options_ = thread_options_temp;
+ }
- pthread_result = pthread_create(&thread_handle_, &attributes, threadFunc, this);
- isThreadRunning_ = (pthread_result == EOK);
- if (!isThreadRunning_) {
- LOG4CXX_WARN(logger_, "Couldn't create thread. Error code = "
- << pthread_result << "(\"" << strerror(pthread_result) << "\")");
- } else {
- LOG4CXX_INFO(logger_,"Created thread: " << name_);
- SetNameForId(Id(thread_handle_), name_);
+ if (!thread_created_) {
+ // state_lock 1
+ pthread_result = pthread_create(&handle_, &attributes, threadFunc, this);
+ if (pthread_result == EOK) {
+ LOG4CXX_DEBUG(logger_, "Created thread: " << name_);
+ SetNameForId(handle_, name_);
+ // state_lock 0
+ // possible concurrencies: stop and threadFunc
+ state_cond_.Wait(auto_lock);
+ thread_created_ = true;
+ } else {
+ LOG4CXX_ERROR(
+ logger_,
+ "Couldn't create thread " << name_ << ". Error code = " << pthread_result << " (\"" << strerror(pthread_result) << "\")");
+ }
}
- LOG4CXX_TRACE_EXIT(logger_);
- return isThreadRunning_;
+ stopped_ = false;
+ run_cond_.NotifyOne();
+ LOG4CXX_DEBUG(
+ logger_,
+ "Thread " << name_ << " #" << handle_ << " started. pthread_result = " << pthread_result);
+ pthread_attr_destroy(&attributes);
+ return pthread_result == EOK;
}
void Thread::stop() {
- LOG4CXX_TRACE_ENTER(logger_);
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock auto_lock(state_lock_);
- if (!atomic_post_clr(&isThreadRunning_))
- {
- return;
- }
+ stopped_ = true;
- if (delegate_ && !delegate_->exitThreadMain()) {
- if (thread_handle_ != pthread_self()) {
- LOG4CXX_WARN(logger_, "Cancelling thread #" << thread_handle_);
- const int pthread_result = pthread_cancel(thread_handle_);
- if (pthread_result != EOK) {
- LOG4CXX_WARN(logger_,
- "Couldn't cancel thread (#" << thread_handle_ << " \"" << name_ <<
- "\") from thread #" << pthread_self() << ". Error code = "
- << pthread_result << " (\"" << strerror(pthread_result) << "\")");
- }
- } else {
- LOG4CXX_ERROR(logger_,
- "Couldn't cancel the same thread (#" << thread_handle_
- << "\"" << name_ << "\")");
- }
+ LOG4CXX_DEBUG(logger_, "Stopping thread #" << handle_
+ << " \"" << name_ << " \"");
+
+ if (delegate_ && isThreadRunning_) {
+ delegate_->exitThreadMain();
}
- LOG4CXX_TRACE_EXIT(logger_);
+ LOG4CXX_DEBUG(logger_,
+ "Stopped thread #" << handle_ << " \"" << name_ << " \"");
}
-bool Thread::Id::operator==(const Thread::Id& other) const {
- return pthread_equal(id_, other.id_) != 0;
-}
+void Thread::join() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ DCHECK(!pthread_equal(pthread_self(), handle_));
-std::ostream& operator<<(std::ostream& os, const Thread::Id& thread_id) {
- char name[32];
- if(pthread_getname_np(thread_id.Handle(), name, 32) == 0) {
- os << name;
+ stop();
+
+ sync_primitives::AutoLock auto_lock(state_lock_);
+ run_cond_.NotifyOne();
+ if (isThreadRunning_) {
+ if (!pthread_equal(pthread_self(), handle_)) {
+ state_cond_.Wait(auto_lock);
+ }
}
- return os;
+}
+
+Thread::~Thread() {
+ finalized_ = true;
+ stopped_ = true;
+ join();
+ pthread_join(handle_, NULL);
}
Thread* CreateThread(const char* name, ThreadDelegate* delegate) {
- return new Thread(name, delegate);
+ Thread* thread = new Thread(name, delegate);
+ delegate->set_thread(thread);
+ return thread;
}
void DeleteThread(Thread* thread) {
delete thread;
}
-
} // namespace threads