summaryrefslogtreecommitdiff
path: root/src/components/utils/src/threads
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/utils/src/threads')
-rw-r--r--src/components/utils/src/threads/async_runner.cc116
-rw-r--r--src/components/utils/src/threads/posix_thread.cc251
-rw-r--r--src/components/utils/src/threads/pulse_thread_delegate.cc4
-rw-r--r--src/components/utils/src/threads/thread_delegate.cc63
-rw-r--r--src/components/utils/src/threads/thread_validator.cc8
5 files changed, 337 insertions, 105 deletions
diff --git a/src/components/utils/src/threads/async_runner.cc b/src/components/utils/src/threads/async_runner.cc
new file mode 100644
index 0000000000..4a00317911
--- /dev/null
+++ b/src/components/utils/src/threads/async_runner.cc
@@ -0,0 +1,116 @@
+/*
+ Copyright (c) 2014, Ford Motor Company
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided with the
+ distribution.
+
+ Neither the name of the Ford Motor Company nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "utils/threads/async_runner.h"
+
+#include <string>
+
+#include "utils/logger.h"
+
+namespace threads {
+
+CREATE_LOGGERPTR_GLOBAL(logger_, "AsyncRunner");
+
+AsyncRunner::AsyncRunner(const std::string &thread_name)
+ : executor_(new AsyncRunnerDelegate) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_ = threads::CreateThread(thread_name.c_str(),
+ executor_);
+ thread_->start();
+}
+
+void AsyncRunner::AsyncRun(ThreadDelegate* delegate) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ executor_->runDelegate(delegate);
+}
+
+void AsyncRunner::Stop() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_->join();
+}
+
+AsyncRunner::~AsyncRunner() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ thread_->join();
+ delete executor_;
+ threads::DeleteThread(thread_);
+}
+
+AsyncRunner::AsyncRunnerDelegate::AsyncRunnerDelegate()
+ : stop_flag_(false) {
+}
+
+void AsyncRunner::AsyncRunnerDelegate::processDelegate() {
+ if (!delegates_queue_.empty()) {
+ delegates_queue_lock_.Acquire();
+ ThreadDelegate* run = delegates_queue_.front();
+ delegates_queue_.pop();
+ delegates_queue_lock_.Release();
+
+ if (NULL != run) {
+ run->threadMain();
+ delete run;
+ }
+ }
+}
+
+void AsyncRunner::AsyncRunnerDelegate::waitForDelegate() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock lock(delegates_queue_lock_);
+ if (!stop_flag_ && delegates_queue_.empty()) {
+ delegate_notifier_.Wait(lock);
+ }
+}
+
+void AsyncRunner::AsyncRunnerDelegate::threadMain() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ while (!stop_flag_) {
+ processDelegate();
+ waitForDelegate();
+ }
+}
+
+void AsyncRunner::AsyncRunnerDelegate::exitThreadMain() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock lock(delegates_queue_lock_);
+ stop_flag_ = true;
+ delegate_notifier_.NotifyOne();
+}
+
+void AsyncRunner::AsyncRunnerDelegate::runDelegate(ThreadDelegate* delegate) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock lock(delegates_queue_lock_);
+ delegates_queue_.push(delegate);
+ delegate_notifier_.NotifyOne();
+}
+
+} // namespace policy.
diff --git a/src/components/utils/src/threads/posix_thread.cc b/src/components/utils/src/threads/posix_thread.cc
index 3f7e006eca..76cc5800e0 100644
--- a/src/components/utils/src/threads/posix_thread.cc
+++ b/src/components/utils/src/threads/posix_thread.cc
@@ -30,19 +30,21 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
+#include "utils/threads/thread.h"
+
#include <errno.h>
#include <limits.h>
#include <stddef.h>
#include <signal.h>
+#include <unistd.h>
#include "utils/atomic.h"
-#include "utils/threads/thread.h"
+#include "utils/threads/thread_delegate.h"
#include "utils/threads/thread_manager.h"
#include "utils/logger.h"
#include "pthread.h"
-
#ifndef __QNXNTO__
const int EOK = 0;
#endif
@@ -55,50 +57,75 @@ namespace threads {
CREATE_LOGGERPTR_GLOBAL(logger_, "Utils")
-namespace {
-
-static void* threadFunc(void* arg) {
- LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " started successfully");
- threads::Thread* thread = static_cast<threads::Thread*>(arg);
- threads::ThreadDelegate* delegate = thread->delegate();
- delegate->threadMain();
- thread->set_running(false);
- MessageQueue<ThreadManager::ThreadDesc>& threads = ::threads::ThreadManager::instance()->threads_to_terminate;
- if (!threads.IsShuttingDown()) {
- LOG4CXX_INFO(logger_, "Pushing thread #" << pthread_self() << " to join queue");
- ThreadManager::ThreadDesc desc = { pthread_self(), delegate };
- threads.push(desc);
- }
- LOG4CXX_INFO(logger_, "Thread #" << pthread_self() << " exited successfully");
- return NULL;
-}
+size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */
+void Thread::cleanup(void* arg) {
+ LOG4CXX_AUTO_TRACE(logger_);
+ Thread* thread = reinterpret_cast<Thread*>(arg);
+ sync_primitives::AutoLock auto_lock(thread->state_lock_);
+ thread->isThreadRunning_ = false;
+ thread->state_cond_.Broadcast();
}
-size_t Thread::kMinStackSize = PTHREAD_STACK_MIN; /* Ubuntu : 16384 ; QNX : 256; */
-
-/*
-void ThreadBase::set_name(const std::string name) {
- std::string trimname = name.erase(15);
- if(pthread_setname_np(thread_handle_, trimname.c_str()) != EOK) {
- LOG4CXX_WARN(logger_, "Couldn't set pthread name \""
- << trimname
- << "\", error code "
- << pthread_result
- << " ("
- << strerror(pthread_result)
- << ")");
+void* Thread::threadFunc(void* arg) {
+ // 0 - state_lock unlocked
+ // stopped = 0
+ // running = 0
+ // finalized = 0
+ // 4 - state_lock unlocked
+ // stopped = 1
+ // running = 1
+ // finalized = 0
+ // 5 - state_lock unlocked
+ // stopped = 1
+ // running = 1
+ // finalized = 1
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " started successfully");
+
+ threads::Thread* thread = reinterpret_cast<Thread*>(arg);
+ DCHECK(thread);
+
+ pthread_cleanup_push(&cleanup, thread);
+
+ thread->state_lock_.Acquire();
+ thread->state_cond_.Broadcast();
+
+ while (!thread->finalized_) {
+ LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " iteration");
+ thread->run_cond_.Wait(thread->state_lock_);
+ LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " execute. "
+ << "stopped_ = " << thread->stopped_ << "; finalized_ = " << thread->finalized_);
+ if (!thread->stopped_ && !thread->finalized_) {
+ thread->isThreadRunning_ = true;
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_testcancel();
+
+ thread->state_lock_.Release();
+ thread->delegate_->threadMain();
+ thread->state_lock_.Acquire();
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ thread->isThreadRunning_ = false;
+ }
+ thread->state_cond_.Broadcast();
+ LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " finished iteration");
}
+
+ thread->state_lock_.Release();
+ pthread_cleanup_pop(1);
+
+ LOG4CXX_DEBUG(logger_, "Thread #" << pthread_self() << " exited successfully");
+ return NULL;
}
-*/
-void Thread::SetNameForId(const Id& thread_id, const std::string& name) {
- std::string nm = name;
- std::string& trimname = nm.size() > 15 ? nm.erase(15) : nm;
- const int rc = pthread_setname_np(thread_id.id_, trimname.c_str());
- if(rc == EOK) {
+void Thread::SetNameForId(const PlatformThreadHandle& thread_id, std::string name) {
+ if (name.size() > THREAD_NAME_SIZE)
+ name.erase(THREAD_NAME_SIZE);
+ const int rc = pthread_setname_np(thread_id, name.c_str());
+ if(rc != EOK) {
LOG4CXX_WARN(logger_, "Couldn't set pthread name \""
- << trimname
+ << name
<< "\", error code "
<< rc
<< " ("
@@ -110,46 +137,61 @@ void Thread::SetNameForId(const Id& thread_id, const std::string& name) {
Thread::Thread(const char* name, ThreadDelegate* delegate)
: name_(name ? name : "undefined"),
delegate_(delegate),
- thread_handle_(0),
+ handle_(0),
thread_options_(),
- isThreadRunning_(0) {
-}
+ isThreadRunning_(0),
+ stopped_(false),
+ finalized_(false),
+ thread_created_(false) { }
-ThreadDelegate* Thread::delegate() const {
- return delegate_;
+bool Thread::start() {
+ return start(thread_options_);
}
-bool Thread::start() {
- return startWithOptions(thread_options_);
+PlatformThreadHandle Thread::CurrentId() {
+ return pthread_self();
}
-Thread::Id Thread::CurrentId() {
- return Id(pthread_self());
+void Thread::WaitForRun() {
+ sync_primitives::AutoLock auto_lock(state_lock_);
+ run_cond_.Wait(auto_lock);
}
-bool Thread::startWithOptions(const ThreadOptions& options) {
- LOG4CXX_TRACE_ENTER(logger_);
+bool Thread::start(const ThreadOptions& options) {
+ LOG4CXX_AUTO_TRACE(logger_);
+
+ sync_primitives::AutoLock auto_lock(state_lock_);
+ // 1 - state_lock locked
+ // stopped = 0
+ // running = 0
+
if (!delegate_) {
- NOTREACHED();
- LOG4CXX_ERROR(logger_, "NULL delegate");
- LOG4CXX_TRACE_EXIT(logger_);
+ LOG4CXX_ERROR(logger_, "Cannot start thread" << name_
+ << ": delegate is NULL");
+ // 0 - state_lock unlocked
return false;
}
+ if (isThreadRunning_) {
+ LOG4CXX_TRACE(logger_, "EXIT thread "<< name_
+ << " #" << handle_ << "is already runing");
+ return true;
+ }
+
thread_options_ = options;
pthread_attr_t attributes;
int pthread_result = pthread_attr_init(&attributes);
if (pthread_result != EOK) {
LOG4CXX_WARN(logger_,"Couldn't init pthread attributes. Error code = "
- << pthread_result << "(\"" << strerror(pthread_result) << "\")");
+ << pthread_result << " (\"" << strerror(pthread_result) << "\")");
}
if (!thread_options_.is_joinable()) {
pthread_result = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED);
if (pthread_result != EOK) {
- LOG4CXX_WARN(logger_,"Couldn't set detach state attribute.. Error code = "
- << pthread_result << "(\"" << strerror(pthread_result) << "\")");
+ LOG4CXX_WARN(logger_,"Couldn't set detach state attribute. Error code = "
+ << pthread_result << " (\"" << strerror(pthread_result) << "\")");
thread_options_.is_joinable(false);
}
}
@@ -159,66 +201,78 @@ bool Thread::startWithOptions(const ThreadOptions& options) {
pthread_result = pthread_attr_setstacksize(&attributes, stack_size);
if (pthread_result != EOK) {
LOG4CXX_WARN(logger_,"Couldn't set stacksize = " << stack_size <<
- ". Error code = " << pthread_result << "(\""
- << strerror(pthread_result) << "\")");
+ ". Error code = " << pthread_result
+ << " (\"" << strerror(pthread_result) << "\")");
}
}
- pthread_result = pthread_create(&thread_handle_, &attributes, threadFunc, this);
- isThreadRunning_ = (pthread_result == EOK);
- if (!isThreadRunning_) {
- LOG4CXX_WARN(logger_, "Couldn't create thread. Error code = "
- << pthread_result << "(\"" << strerror(pthread_result) << "\")");
- } else {
- LOG4CXX_INFO(logger_,"Created thread: " << name_);
- SetNameForId(Id(thread_handle_), name_);
+ if (!thread_created_) {
+ // state_lock 1
+ pthread_result = pthread_create(&handle_, &attributes, threadFunc, this);
+ if (pthread_result == EOK) {
+ LOG4CXX_INFO(logger_,"Created thread: " << name_);
+ SetNameForId(handle_, name_);
+ // state_lock 0
+ // possible concurrencies: stop and threadFunc
+ state_cond_.Wait(auto_lock);
+ thread_created_ = true;
+ } else {
+ LOG4CXX_ERROR(logger_, "Couldn't create thread " << name_
+ << ". Error code = " << pthread_result
+ << " (\"" << strerror(pthread_result) << "\")");
+ }
}
- LOG4CXX_TRACE_EXIT(logger_);
- return isThreadRunning_;
+ stopped_ = false;
+ run_cond_.NotifyOne();
+ LOG4CXX_DEBUG(logger_,"Thread " << name_
+ << " #" << handle_ << " started. pthread_result = "
+ << pthread_result);
+ return pthread_result == EOK;
}
void Thread::stop() {
- LOG4CXX_TRACE_ENTER(logger_);
+ LOG4CXX_AUTO_TRACE(logger_);
+ sync_primitives::AutoLock auto_lock(state_lock_);
- if (!atomic_post_clr(&isThreadRunning_))
- {
- return;
- }
+ stopped_ = true;
- if (delegate_ && !delegate_->exitThreadMain()) {
- if (thread_handle_ != pthread_self()) {
- LOG4CXX_WARN(logger_, "Cancelling thread #" << thread_handle_);
- const int pthread_result = pthread_cancel(thread_handle_);
- if (pthread_result != EOK) {
- LOG4CXX_WARN(logger_,
- "Couldn't cancel thread (#" << thread_handle_ << " \"" << name_ <<
- "\") from thread #" << pthread_self() << ". Error code = "
- << pthread_result << " (\"" << strerror(pthread_result) << "\")");
- }
- } else {
- LOG4CXX_ERROR(logger_,
- "Couldn't cancel the same thread (#" << thread_handle_
- << "\"" << name_ << "\")");
- }
+ LOG4CXX_DEBUG(logger_, "Stopping thread #" << handle_
+ << " \"" << name_ << " \"");
+
+ if (delegate_ && isThreadRunning_) {
+ delegate_->exitThreadMain();
}
- LOG4CXX_TRACE_EXIT(logger_);
+ LOG4CXX_DEBUG(logger_, "Stopped thread #" << handle_
+ << " \"" << name_ << " \"");
}
-bool Thread::Id::operator==(const Thread::Id& other) const {
- return pthread_equal(id_, other.id_) != 0;
-}
+void Thread::join() {
+ LOG4CXX_AUTO_TRACE(logger_);
+ DCHECK(!pthread_equal(pthread_self(), handle_));
+
+ stop();
-std::ostream& operator<<(std::ostream& os, const Thread::Id& thread_id) {
- char name[32];
- if(pthread_getname_np(thread_id.Handle(), name, 32) == 0) {
- os << name;
+ sync_primitives::AutoLock auto_lock(state_lock_);
+ run_cond_.NotifyOne();
+ if (isThreadRunning_) {
+ if(!pthread_equal(pthread_self(), handle_)) {
+ state_cond_.Wait(auto_lock);
+ }
}
- return os;
+}
+
+Thread::~Thread() {
+ finalized_ = true;
+ stopped_ = true;
+ join();
+ pthread_join(handle_, NULL);
}
Thread* CreateThread(const char* name, ThreadDelegate* delegate) {
- return new Thread(name, delegate);
+ Thread* thread = new Thread(name, delegate);
+ delegate->set_thread(thread);
+ return thread;
}
void DeleteThread(Thread* thread) {
@@ -226,4 +280,5 @@ void DeleteThread(Thread* thread) {
}
+
} // namespace threads
diff --git a/src/components/utils/src/threads/pulse_thread_delegate.cc b/src/components/utils/src/threads/pulse_thread_delegate.cc
index 8c580bea83..68db5dcbea 100644
--- a/src/components/utils/src/threads/pulse_thread_delegate.cc
+++ b/src/components/utils/src/threads/pulse_thread_delegate.cc
@@ -91,7 +91,7 @@ void PulseThreadDelegate::threadMain() {
Finalize();
}
-bool PulseThreadDelegate::exitThreadMain() {
+void PulseThreadDelegate::exitThreadMain() {
run_ = false;
LOG4CXX_TRACE(logger_, "Disconnecting from QNX channel " << chid_);
@@ -109,8 +109,6 @@ bool PulseThreadDelegate::exitThreadMain() {
else {
LOG4CXX_WARN(logger_, "Failed to destroy QNX channel " << chid_);
}
-
- return true;
}
} // namespace threads
diff --git a/src/components/utils/src/threads/thread_delegate.cc b/src/components/utils/src/threads/thread_delegate.cc
new file mode 100644
index 0000000000..13271166ff
--- /dev/null
+++ b/src/components/utils/src/threads/thread_delegate.cc
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2014, Ford Motor Company
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * Neither the name of the Ford Motor Company nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "utils/threads/thread_delegate.h"
+
+#include <pthread.h>
+
+#include "utils/threads/thread.h"
+#include "utils/lock.h"
+
+namespace threads {
+
+ThreadDelegate::~ThreadDelegate() {
+ if(thread_) {
+ thread_->set_delegate(NULL);
+ }
+}
+
+void ThreadDelegate::exitThreadMain() {
+ if (thread_) {
+ if (thread_->thread_handle() == pthread_self()) {
+ pthread_exit(NULL);
+ } else {
+ pthread_cancel(thread_->thread_handle());
+ }
+ }
+}
+
+void ThreadDelegate::set_thread(Thread *thread) {
+ DCHECK(thread && !thread->is_running());
+ thread_ = thread;
+}
+
+}
diff --git a/src/components/utils/src/threads/thread_validator.cc b/src/components/utils/src/threads/thread_validator.cc
index 5e9c88a7c9..d10a736078 100644
--- a/src/components/utils/src/threads/thread_validator.cc
+++ b/src/components/utils/src/threads/thread_validator.cc
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright (c) 2013, Ford Motor Company
* All rights reserved.
*
@@ -46,7 +46,7 @@ SingleThreadSimpleValidator::~SingleThreadSimpleValidator() {
}
void SingleThreadSimpleValidator::AssertRunningOnCreationThread() const {
- Thread::Id current_id = Thread::CurrentId();
+ PlatformThreadHandle current_id = Thread::CurrentId();
if (creation_thread_id_ != current_id) {
LOG4CXX_ERROR(logger_, "Single-threaded object created at thread "
<< creation_thread_id_
@@ -68,12 +68,12 @@ SingleThreadValidator::SingleThreadValidator()
SingleThreadValidator::~SingleThreadValidator() {
}
-void SingleThreadValidator::PassToThread(Thread::Id thread_id) const {
+void SingleThreadValidator::PassToThread(PlatformThreadHandle thread_id) const {
owning_thread_id_ = thread_id;
}
void SingleThreadValidator::AssertRunningOnValidThread() const {
- Thread::Id current_id = Thread::CurrentId();
+ PlatformThreadHandle current_id = Thread::CurrentId();
if (owning_thread_id_ != current_id) {
LOG4CXX_ERROR(logger_, "Single-threaded object owned by thread "
<< owning_thread_id_