diff options
author | Zeno Albisser <zeno.albisser@digia.com> | 2013-08-15 21:46:11 +0200 |
---|---|---|
committer | Zeno Albisser <zeno.albisser@digia.com> | 2013-08-15 21:46:11 +0200 |
commit | 679147eead574d186ebf3069647b4c23e8ccace6 (patch) | |
tree | fc247a0ac8ff119f7c8550879ebb6d3dd8d1ff69 /chromium/base/threading | |
download | qtwebengine-chromium-679147eead574d186ebf3069647b4c23e8ccace6.tar.gz |
Initial import.
Diffstat (limited to 'chromium/base/threading')
53 files changed, 9297 insertions, 0 deletions
diff --git a/chromium/base/threading/OWNERS b/chromium/base/threading/OWNERS new file mode 100644 index 00000000000..4198e99c5cc --- /dev/null +++ b/chromium/base/threading/OWNERS @@ -0,0 +1,2 @@ +# For thread_resrictions.* +jam@chromium.org diff --git a/chromium/base/threading/non_thread_safe.h b/chromium/base/threading/non_thread_safe.h new file mode 100644 index 00000000000..cf7a41818da --- /dev/null +++ b/chromium/base/threading/non_thread_safe.h @@ -0,0 +1,73 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_NON_THREAD_SAFE_H_ +#define BASE_THREADING_NON_THREAD_SAFE_H_ + +// Classes deriving from NonThreadSafe may need to suppress MSVC warning 4275: +// non dll-interface class 'Bar' used as base for dll-interface class 'Foo'. +// There is a specific macro to do it: NON_EXPORTED_BASE(), defined in +// compiler_specific.h +#include "base/compiler_specific.h" + +// See comment at top of thread_checker.h +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) +#define ENABLE_NON_THREAD_SAFE 1 +#else +#define ENABLE_NON_THREAD_SAFE 0 +#endif + +#if ENABLE_NON_THREAD_SAFE +#include "base/threading/non_thread_safe_impl.h" +#endif + +namespace base { + +// Do nothing implementation of NonThreadSafe, for release mode. +// +// Note: You should almost always use the NonThreadSafe class to get +// the right version of the class for your build configuration. +class NonThreadSafeDoNothing { + public: + bool CalledOnValidThread() const { + return true; + } + + protected: + ~NonThreadSafeDoNothing() {} + void DetachFromThread() {} +}; + +// NonThreadSafe is a helper class used to help verify that methods of a +// class are called from the same thread. One can inherit from this class +// and use CalledOnValidThread() to verify. +// +// This is intended to be used with classes that appear to be thread safe, but +// aren't. For example, a service or a singleton like the preferences system. +// +// Example: +// class MyClass : public base::NonThreadSafe { +// public: +// void Foo() { +// DCHECK(CalledOnValidThread()); +// ... (do stuff) ... +// } +// } +// +// Note that base::ThreadChecker offers identical functionality to +// NonThreadSafe, but does not require inheritence. In general, it is preferable +// to have a base::ThreadChecker as a member, rather than inherit from +// NonThreadSafe. For more details about when to choose one over the other, see +// the documentation for base::ThreadChecker. +#if ENABLE_NON_THREAD_SAFE +typedef NonThreadSafeImpl NonThreadSafe; +#else +typedef NonThreadSafeDoNothing NonThreadSafe; +#endif // ENABLE_NON_THREAD_SAFE + +#undef ENABLE_NON_THREAD_SAFE + +} // namespace base + +#endif // BASE_NON_THREAD_SAFE_H_ diff --git a/chromium/base/threading/non_thread_safe_impl.cc b/chromium/base/threading/non_thread_safe_impl.cc new file mode 100644 index 00000000000..7e729d9ee42 --- /dev/null +++ b/chromium/base/threading/non_thread_safe_impl.cc @@ -0,0 +1,23 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/non_thread_safe_impl.h" + +#include "base/logging.h" + +namespace base { + +bool NonThreadSafeImpl::CalledOnValidThread() const { + return thread_checker_.CalledOnValidThread(); +} + +NonThreadSafeImpl::~NonThreadSafeImpl() { + DCHECK(CalledOnValidThread()); +} + +void NonThreadSafeImpl::DetachFromThread() { + thread_checker_.DetachFromThread(); +} + +} // namespace base diff --git a/chromium/base/threading/non_thread_safe_impl.h b/chromium/base/threading/non_thread_safe_impl.h new file mode 100644 index 00000000000..a3a356df4a5 --- /dev/null +++ b/chromium/base/threading/non_thread_safe_impl.h @@ -0,0 +1,39 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_NON_THREAD_SAFE_IMPL_H_ +#define BASE_THREADING_NON_THREAD_SAFE_IMPL_H_ + +#include "base/base_export.h" +#include "base/threading/thread_checker_impl.h" + +namespace base { + +// Full implementation of NonThreadSafe, for debug mode or for occasional +// temporary use in release mode e.g. when you need to CHECK on a thread +// bug that only occurs in the wild. +// +// Note: You should almost always use the NonThreadSafe class to get +// the right version of the class for your build configuration. +class BASE_EXPORT NonThreadSafeImpl { + public: + bool CalledOnValidThread() const; + + protected: + ~NonThreadSafeImpl(); + + // Changes the thread that is checked for in CalledOnValidThread. The next + // call to CalledOnValidThread will attach this class to a new thread. It is + // up to the NonThreadSafe derived class to decide to expose this or not. + // This may be useful when an object may be created on one thread and then + // used exclusively on another thread. + void DetachFromThread(); + + private: + ThreadCheckerImpl thread_checker_; +}; + +} // namespace base + +#endif // BASE_THREADING_NON_THREAD_SAFE_IMPL_H_ diff --git a/chromium/base/threading/non_thread_safe_unittest.cc b/chromium/base/threading/non_thread_safe_unittest.cc new file mode 100644 index 00000000000..8a82a637ab7 --- /dev/null +++ b/chromium/base/threading/non_thread_safe_unittest.cc @@ -0,0 +1,167 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/basictypes.h" +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/threading/non_thread_safe.h" +#include "base/threading/simple_thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +// Duplicated from base/threading/non_thread_safe.h so that we can be +// good citizens there and undef the macro. +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) +#define ENABLE_NON_THREAD_SAFE 1 +#else +#define ENABLE_NON_THREAD_SAFE 0 +#endif + +namespace base { + +namespace { + +// Simple class to exersice the basics of NonThreadSafe. +// Both the destructor and DoStuff should verify that they were +// called on the same thread as the constructor. +class NonThreadSafeClass : public NonThreadSafe { + public: + NonThreadSafeClass() {} + + // Verifies that it was called on the same thread as the constructor. + void DoStuff() { + DCHECK(CalledOnValidThread()); + } + + void DetachFromThread() { + NonThreadSafe::DetachFromThread(); + } + + static void MethodOnDifferentThreadImpl(); + static void DestructorOnDifferentThreadImpl(); + + private: + DISALLOW_COPY_AND_ASSIGN(NonThreadSafeClass); +}; + +// Calls NonThreadSafeClass::DoStuff on another thread. +class CallDoStuffOnThread : public SimpleThread { + public: + explicit CallDoStuffOnThread(NonThreadSafeClass* non_thread_safe_class) + : SimpleThread("call_do_stuff_on_thread"), + non_thread_safe_class_(non_thread_safe_class) { + } + + virtual void Run() OVERRIDE { + non_thread_safe_class_->DoStuff(); + } + + private: + NonThreadSafeClass* non_thread_safe_class_; + + DISALLOW_COPY_AND_ASSIGN(CallDoStuffOnThread); +}; + +// Deletes NonThreadSafeClass on a different thread. +class DeleteNonThreadSafeClassOnThread : public SimpleThread { + public: + explicit DeleteNonThreadSafeClassOnThread( + NonThreadSafeClass* non_thread_safe_class) + : SimpleThread("delete_non_thread_safe_class_on_thread"), + non_thread_safe_class_(non_thread_safe_class) { + } + + virtual void Run() OVERRIDE { + non_thread_safe_class_.reset(); + } + + private: + scoped_ptr<NonThreadSafeClass> non_thread_safe_class_; + + DISALLOW_COPY_AND_ASSIGN(DeleteNonThreadSafeClassOnThread); +}; + +} // namespace + +TEST(NonThreadSafeTest, CallsAllowedOnSameThread) { + scoped_ptr<NonThreadSafeClass> non_thread_safe_class( + new NonThreadSafeClass); + + // Verify that DoStuff doesn't assert. + non_thread_safe_class->DoStuff(); + + // Verify that the destructor doesn't assert. + non_thread_safe_class.reset(); +} + +TEST(NonThreadSafeTest, DetachThenDestructOnDifferentThread) { + scoped_ptr<NonThreadSafeClass> non_thread_safe_class( + new NonThreadSafeClass); + + // Verify that the destructor doesn't assert when called on a different thread + // after a detach. + non_thread_safe_class->DetachFromThread(); + DeleteNonThreadSafeClassOnThread delete_on_thread( + non_thread_safe_class.release()); + + delete_on_thread.Start(); + delete_on_thread.Join(); +} + +#if GTEST_HAS_DEATH_TEST || !ENABLE_NON_THREAD_SAFE + +void NonThreadSafeClass::MethodOnDifferentThreadImpl() { + scoped_ptr<NonThreadSafeClass> non_thread_safe_class( + new NonThreadSafeClass); + + // Verify that DoStuff asserts in debug builds only when called + // on a different thread. + CallDoStuffOnThread call_on_thread(non_thread_safe_class.get()); + + call_on_thread.Start(); + call_on_thread.Join(); +} + +#if ENABLE_NON_THREAD_SAFE +TEST(NonThreadSafeDeathTest, MethodNotAllowedOnDifferentThreadInDebug) { + ASSERT_DEATH({ + NonThreadSafeClass::MethodOnDifferentThreadImpl(); + }, ""); +} +#else +TEST(NonThreadSafeTest, MethodAllowedOnDifferentThreadInRelease) { + NonThreadSafeClass::MethodOnDifferentThreadImpl(); +} +#endif // ENABLE_NON_THREAD_SAFE + +void NonThreadSafeClass::DestructorOnDifferentThreadImpl() { + scoped_ptr<NonThreadSafeClass> non_thread_safe_class( + new NonThreadSafeClass); + + // Verify that the destructor asserts in debug builds only + // when called on a different thread. + DeleteNonThreadSafeClassOnThread delete_on_thread( + non_thread_safe_class.release()); + + delete_on_thread.Start(); + delete_on_thread.Join(); +} + +#if ENABLE_NON_THREAD_SAFE +TEST(NonThreadSafeDeathTest, DestructorNotAllowedOnDifferentThreadInDebug) { + ASSERT_DEATH({ + NonThreadSafeClass::DestructorOnDifferentThreadImpl(); + }, ""); +} +#else +TEST(NonThreadSafeTest, DestructorAllowedOnDifferentThreadInRelease) { + NonThreadSafeClass::DestructorOnDifferentThreadImpl(); +} +#endif // ENABLE_NON_THREAD_SAFE + +#endif // GTEST_HAS_DEATH_TEST || !ENABLE_NON_THREAD_SAFE + +// Just in case we ever get lumped together with other compilation units. +#undef ENABLE_NON_THREAD_SAFE + +} // namespace base diff --git a/chromium/base/threading/platform_thread.h b/chromium/base/threading/platform_thread.h new file mode 100644 index 00000000000..9742d570c6f --- /dev/null +++ b/chromium/base/threading/platform_thread.h @@ -0,0 +1,160 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// WARNING: You should *NOT* be using this class directly. PlatformThread is +// the low-level platform-specific abstraction to the OS's threading interface. +// You should instead be using a message-loop driven Thread, see thread.h. + +#ifndef BASE_THREADING_PLATFORM_THREAD_H_ +#define BASE_THREADING_PLATFORM_THREAD_H_ + +#include "base/base_export.h" +#include "base/basictypes.h" +#include "base/time/time.h" +#include "build/build_config.h" + +#if defined(OS_WIN) +#include <windows.h> +#elif defined(OS_POSIX) +#include <pthread.h> +#include <unistd.h> +#endif + +namespace base { + +#if defined(OS_WIN) +typedef DWORD PlatformThreadId; +#elif defined(OS_POSIX) +typedef pid_t PlatformThreadId; +#endif + +class PlatformThreadHandle { + public: +#if defined(OS_WIN) + typedef void* Handle; +#elif defined(OS_POSIX) + typedef pthread_t Handle; +#endif + + PlatformThreadHandle() + : handle_(0), + id_(0) { + } + + explicit PlatformThreadHandle(Handle handle) + : handle_(handle), + id_(0) { + } + + PlatformThreadHandle(Handle handle, + PlatformThreadId id) + : handle_(handle), + id_(id) { + } + + bool is_equal(const PlatformThreadHandle& other) { + return handle_ == other.handle_; + } + + bool is_null() { + return !handle_; + } + + Handle platform_handle() { + return handle_; + } + + private: + friend class PlatformThread; + + Handle handle_; + PlatformThreadId id_; +}; + +const PlatformThreadId kInvalidThreadId(0); + +// Valid values for SetThreadPriority() +enum ThreadPriority{ + kThreadPriority_Normal, + // Suitable for low-latency, glitch-resistant audio. + kThreadPriority_RealtimeAudio, + // Suitable for threads which generate data for the display (at ~60Hz). + kThreadPriority_Display, + // Suitable for threads that shouldn't disrupt high priority work. + kThreadPriority_Background +}; + +// A namespace for low-level thread functions. +class BASE_EXPORT PlatformThread { + public: + // Implement this interface to run code on a background thread. Your + // ThreadMain method will be called on the newly created thread. + class BASE_EXPORT Delegate { + public: + virtual void ThreadMain() = 0; + + protected: + virtual ~Delegate() {} + }; + + // Gets the current thread id, which may be useful for logging purposes. + static PlatformThreadId CurrentId(); + + // Get the current handle. + static PlatformThreadHandle CurrentHandle(); + + // Yield the current thread so another thread can be scheduled. + static void YieldCurrentThread(); + + // Sleeps for the specified duration. + static void Sleep(base::TimeDelta duration); + + // Sets the thread name visible to debuggers/tools. This has no effect + // otherwise. This name pointer is not copied internally. Thus, it must stay + // valid until the thread ends. + static void SetName(const char* name); + + // Gets the thread name, if previously set by SetName. + static const char* GetName(); + + // Creates a new thread. The |stack_size| parameter can be 0 to indicate + // that the default stack size should be used. Upon success, + // |*thread_handle| will be assigned a handle to the newly created thread, + // and |delegate|'s ThreadMain method will be executed on the newly created + // thread. + // NOTE: When you are done with the thread handle, you must call Join to + // release system resources associated with the thread. You must ensure that + // the Delegate object outlives the thread. + static bool Create(size_t stack_size, Delegate* delegate, + PlatformThreadHandle* thread_handle); + + // CreateWithPriority() does the same thing as Create() except the priority of + // the thread is set based on |priority|. Can be used in place of Create() + // followed by SetThreadPriority(). SetThreadPriority() has not been + // implemented on the Linux platform yet, this is the only way to get a high + // priority thread on Linux. + static bool CreateWithPriority(size_t stack_size, Delegate* delegate, + PlatformThreadHandle* thread_handle, + ThreadPriority priority); + + // CreateNonJoinable() does the same thing as Create() except the thread + // cannot be Join()'d. Therefore, it also does not output a + // PlatformThreadHandle. + static bool CreateNonJoinable(size_t stack_size, Delegate* delegate); + + // Joins with a thread created via the Create function. This function blocks + // the caller until the designated thread exits. This will invalidate + // |thread_handle|. + static void Join(PlatformThreadHandle thread_handle); + + static void SetThreadPriority(PlatformThreadHandle handle, + ThreadPriority priority); + + private: + DISALLOW_IMPLICIT_CONSTRUCTORS(PlatformThread); +}; + +} // namespace base + +#endif // BASE_THREADING_PLATFORM_THREAD_H_ diff --git a/chromium/base/threading/platform_thread_android.cc b/chromium/base/threading/platform_thread_android.cc new file mode 100644 index 00000000000..28026350b66 --- /dev/null +++ b/chromium/base/threading/platform_thread_android.cc @@ -0,0 +1,105 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/platform_thread.h" + +#include <errno.h> +#include <sys/resource.h> + +#include "base/android/jni_android.h" +#include "base/android/thread_utils.h" +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/threading/thread_id_name_manager.h" +#include "base/tracked_objects.h" +#include "jni/ThreadUtils_jni.h" + +namespace base { + +namespace { +int ThreadNiceValue(ThreadPriority priority) { + // These nice values are taken from Android, which uses nice + // values like linux, but defines some preset nice values. + // Process.THREAD_PRIORITY_AUDIO = -16 + // Process.THREAD_PRIORITY_BACKGROUND = 10 + // Process.THREAD_PRIORITY_DEFAULT = 0; + // Process.THREAD_PRIORITY_DISPLAY = -4; + // Process.THREAD_PRIORITY_FOREGROUND = -2; + // Process.THREAD_PRIORITY_LESS_FAVORABLE = 1; + // Process.THREAD_PRIORITY_LOWEST = 19; + // Process.THREAD_PRIORITY_MORE_FAVORABLE = -1; + // Process.THREAD_PRIORITY_URGENT_AUDIO = -19; + // Process.THREAD_PRIORITY_URGENT_DISPLAY = -8; + // We use -6 for display, but we may want to split this + // into urgent (-8) and non-urgent (-4). + static const int threadPriorityAudio = -16; + static const int threadPriorityBackground = 10; + static const int threadPriorityDefault = 0; + static const int threadPriorityDisplay = -6; + switch (priority) { + case kThreadPriority_RealtimeAudio: + return threadPriorityAudio; + case kThreadPriority_Background: + return threadPriorityBackground; + case kThreadPriority_Normal: + return threadPriorityDefault; + case kThreadPriority_Display: + return threadPriorityDisplay; + default: + NOTREACHED() << "Unknown priority."; + return 0; + } +} +} // namespace + +//static +void PlatformThread::SetThreadPriority(PlatformThreadHandle handle, + ThreadPriority priority) { + // On Android, we set the Audio priority through JNI as Audio priority + // will also allow the process to run while it is backgrounded. + if (priority == kThreadPriority_RealtimeAudio) { + JNIEnv* env = base::android::AttachCurrentThread(); + Java_ThreadUtils_setThreadPriorityAudio(env, PlatformThread::CurrentId()); + return; + } + + // setpriority(2) will set a thread's priority if it is passed a tid as + // the 'process identifier', not affecting the rest of the threads in the + // process. Setting this priority will only succeed if the user has been + // granted permission to adjust nice values on the system. + DCHECK_NE(handle.id_, kInvalidThreadId); + int kNiceSetting = ThreadNiceValue(priority); + if (setpriority(PRIO_PROCESS, handle.id_, kNiceSetting)) + LOG(ERROR) << "Failed to set nice value of thread to " << kNiceSetting; +} + +void PlatformThread::SetName(const char* name) { + ThreadIdNameManager::GetInstance()->SetName(CurrentId(), name); + tracked_objects::ThreadData::InitializeThreadContext(name); +} + + +void InitThreading() { +} + +void InitOnThread() { + // Threads on linux/android may inherit their priority from the thread + // where they were created. This sets all new threads to the default. + PlatformThread::SetThreadPriority(PlatformThread::CurrentHandle(), + kThreadPriority_Normal); +} + +void TerminateOnThread() { + base::android::DetachFromVM(); +} + +size_t GetDefaultThreadStackSize(const pthread_attr_t& attributes) { + return 0; +} + +bool RegisterThreadUtils(JNIEnv* env) { + return RegisterNativesImpl(env); +} + +} // namespace base diff --git a/chromium/base/threading/platform_thread_linux.cc b/chromium/base/threading/platform_thread_linux.cc new file mode 100644 index 00000000000..80227c32079 --- /dev/null +++ b/chromium/base/threading/platform_thread_linux.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/platform_thread.h" + +#include <errno.h> +#include <sched.h> + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/safe_strerror_posix.h" +#include "base/threading/thread_id_name_manager.h" +#include "base/threading/thread_restrictions.h" +#include "base/tracked_objects.h" + +#if !defined(OS_NACL) +#include <sys/prctl.h> +#include <sys/resource.h> +#include <sys/syscall.h> +#include <sys/time.h> +#include <unistd.h> +#endif + +namespace base { + +namespace { +int ThreadNiceValue(ThreadPriority priority) { + static const int threadPriorityAudio = -10; + static const int threadPriorityBackground = 10; + static const int threadPriorityDefault = 0; + static const int threadPriorityDisplay = -6; + switch (priority) { + case kThreadPriority_RealtimeAudio: + return threadPriorityAudio; + case kThreadPriority_Background: + return threadPriorityBackground; + case kThreadPriority_Normal: + return threadPriorityDefault; + case kThreadPriority_Display: + return threadPriorityDisplay; + default: + NOTREACHED() << "Unknown priority."; + return 0; + } +} +} // namespace + +// static +void PlatformThread::SetName(const char* name) { + ThreadIdNameManager::GetInstance()->SetName(CurrentId(), name); + tracked_objects::ThreadData::InitializeThreadContext(name); + +#ifndef OS_NACL + // On linux we can get the thread names to show up in the debugger by setting + // the process name for the LWP. We don't want to do this for the main + // thread because that would rename the process, causing tools like killall + // to stop working. + if (PlatformThread::CurrentId() == getpid()) + return; + + // http://0pointer.de/blog/projects/name-your-threads.html + // Set the name for the LWP (which gets truncated to 15 characters). + // Note that glibc also has a 'pthread_setname_np' api, but it may not be + // available everywhere and it's only benefit over using prctl directly is + // that it can set the name of threads other than the current thread. + int err = prctl(PR_SET_NAME, name); + // We expect EPERM failures in sandboxed processes, just ignore those. + if (err < 0 && errno != EPERM) + DPLOG(ERROR) << "prctl(PR_SET_NAME)"; +#endif +} + +// static +void PlatformThread::SetThreadPriority(PlatformThreadHandle handle, + ThreadPriority priority) { +#if !defined(OS_NACL) + if (priority == kThreadPriority_RealtimeAudio) { + const int kRealTimePrio = 8; + + struct sched_param sched_param; + memset(&sched_param, 0, sizeof(sched_param)); + sched_param.sched_priority = kRealTimePrio; + + if (pthread_setschedparam(pthread_self(), SCHED_RR, &sched_param) == 0) { + // Got real time priority, no need to set nice level. + return; + } + } + + // setpriority(2) will set a thread's priority if it is passed a tid as + // the 'process identifier', not affecting the rest of the threads in the + // process. Setting this priority will only succeed if the user has been + // granted permission to adjust nice values on the system. + DCHECK_NE(handle.id_, kInvalidThreadId); + int kNiceSetting = ThreadNiceValue(priority); + if (setpriority(PRIO_PROCESS, handle.id_, kNiceSetting)) + LOG(ERROR) << "Failed to set nice value of thread to " << kNiceSetting; +#endif // !OS_NACL +} + +void InitThreading() { +} + +void InitOnThread() { +} + +void TerminateOnThread() { +} + +size_t GetDefaultThreadStackSize(const pthread_attr_t& attributes) { + return 0; +} + +} // namespace base diff --git a/chromium/base/threading/platform_thread_mac.mm b/chromium/base/threading/platform_thread_mac.mm new file mode 100644 index 00000000000..d81a286cb2c --- /dev/null +++ b/chromium/base/threading/platform_thread_mac.mm @@ -0,0 +1,224 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/platform_thread.h" + +#import <Foundation/Foundation.h> +#include <algorithm> +#include <dlfcn.h> +#include <mach/mach.h> +#include <mach/mach_time.h> +#include <mach/thread_policy.h> +#include <sys/resource.h> + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/threading/thread_id_name_manager.h" +#include "base/tracked_objects.h" + +namespace base { + +// If Cocoa is to be used on more than one thread, it must know that the +// application is multithreaded. Since it's possible to enter Cocoa code +// from threads created by pthread_thread_create, Cocoa won't necessarily +// be aware that the application is multithreaded. Spawning an NSThread is +// enough to get Cocoa to set up for multithreaded operation, so this is done +// if necessary before pthread_thread_create spawns any threads. +// +// http://developer.apple.com/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/chapter_4_section_4.html +void InitThreading() { + static BOOL multithreaded = [NSThread isMultiThreaded]; + if (!multithreaded) { + // +[NSObject class] is idempotent. + [NSThread detachNewThreadSelector:@selector(class) + toTarget:[NSObject class] + withObject:nil]; + multithreaded = YES; + + DCHECK([NSThread isMultiThreaded]); + } +} + +// static +void PlatformThread::SetName(const char* name) { + ThreadIdNameManager::GetInstance()->SetName(CurrentId(), name); + tracked_objects::ThreadData::InitializeThreadContext(name); + + // pthread_setname_np is only available in 10.6 or later, so test + // for it at runtime. + int (*dynamic_pthread_setname_np)(const char*); + *reinterpret_cast<void**>(&dynamic_pthread_setname_np) = + dlsym(RTLD_DEFAULT, "pthread_setname_np"); + if (!dynamic_pthread_setname_np) + return; + + // Mac OS X does not expose the length limit of the name, so + // hardcode it. + const int kMaxNameLength = 63; + std::string shortened_name = std::string(name).substr(0, kMaxNameLength); + // pthread_setname() fails (harmlessly) in the sandbox, ignore when it does. + // See http://crbug.com/47058 + dynamic_pthread_setname_np(shortened_name.c_str()); +} + +namespace { + +void SetPriorityNormal(mach_port_t mach_thread_id) { + // Make thread standard policy. + // Please note that this call could fail in rare cases depending + // on runtime conditions. + thread_standard_policy policy; + kern_return_t result = thread_policy_set(mach_thread_id, + THREAD_STANDARD_POLICY, + (thread_policy_t)&policy, + THREAD_STANDARD_POLICY_COUNT); + + if (result != KERN_SUCCESS) + DVLOG(1) << "thread_policy_set() failure: " << result; +} + +// Enables time-contraint policy and priority suitable for low-latency, +// glitch-resistant audio. +void SetPriorityRealtimeAudio(mach_port_t mach_thread_id) { + kern_return_t result; + + // Increase thread priority to real-time. + + // Please note that the thread_policy_set() calls may fail in + // rare cases if the kernel decides the system is under heavy load + // and is unable to handle boosting the thread priority. + // In these cases we just return early and go on with life. + + // Make thread fixed priority. + thread_extended_policy_data_t policy; + policy.timeshare = 0; // Set to 1 for a non-fixed thread. + result = thread_policy_set(mach_thread_id, + THREAD_EXTENDED_POLICY, + (thread_policy_t)&policy, + THREAD_EXTENDED_POLICY_COUNT); + if (result != KERN_SUCCESS) { + DVLOG(1) << "thread_policy_set() failure: " << result; + return; + } + + // Set to relatively high priority. + thread_precedence_policy_data_t precedence; + precedence.importance = 63; + result = thread_policy_set(mach_thread_id, + THREAD_PRECEDENCE_POLICY, + (thread_policy_t)&precedence, + THREAD_PRECEDENCE_POLICY_COUNT); + if (result != KERN_SUCCESS) { + DVLOG(1) << "thread_policy_set() failure: " << result; + return; + } + + // Most important, set real-time constraints. + + // Define the guaranteed and max fraction of time for the audio thread. + // These "duty cycle" values can range from 0 to 1. A value of 0.5 + // means the scheduler would give half the time to the thread. + // These values have empirically been found to yield good behavior. + // Good means that audio performance is high and other threads won't starve. + const double kGuaranteedAudioDutyCycle = 0.75; + const double kMaxAudioDutyCycle = 0.85; + + // Define constants determining how much time the audio thread can + // use in a given time quantum. All times are in milliseconds. + + // About 128 frames @44.1KHz + const double kTimeQuantum = 2.9; + + // Time guaranteed each quantum. + const double kAudioTimeNeeded = kGuaranteedAudioDutyCycle * kTimeQuantum; + + // Maximum time each quantum. + const double kMaxTimeAllowed = kMaxAudioDutyCycle * kTimeQuantum; + + // Get the conversion factor from milliseconds to absolute time + // which is what the time-constraints call needs. + mach_timebase_info_data_t tb_info; + mach_timebase_info(&tb_info); + double ms_to_abs_time = + ((double)tb_info.denom / (double)tb_info.numer) * 1000000; + + thread_time_constraint_policy_data_t time_constraints; + time_constraints.period = kTimeQuantum * ms_to_abs_time; + time_constraints.computation = kAudioTimeNeeded * ms_to_abs_time; + time_constraints.constraint = kMaxTimeAllowed * ms_to_abs_time; + time_constraints.preemptible = 0; + + result = thread_policy_set(mach_thread_id, + THREAD_TIME_CONSTRAINT_POLICY, + (thread_policy_t)&time_constraints, + THREAD_TIME_CONSTRAINT_POLICY_COUNT); + if (result != KERN_SUCCESS) + DVLOG(1) << "thread_policy_set() failure: " << result; + + return; +} + +} // anonymous namespace + +// static +void PlatformThread::SetThreadPriority(PlatformThreadHandle handle, + ThreadPriority priority) { + // Convert from pthread_t to mach thread identifier. + mach_port_t mach_thread_id = pthread_mach_thread_np(handle.handle_); + + switch (priority) { + case kThreadPriority_Normal: + SetPriorityNormal(mach_thread_id); + break; + case kThreadPriority_RealtimeAudio: + SetPriorityRealtimeAudio(mach_thread_id); + break; + default: + NOTREACHED() << "Unknown priority."; + break; + } +} + +size_t GetDefaultThreadStackSize(const pthread_attr_t& attributes) { +#if defined(OS_IOS) + return 0; +#else + // The Mac OS X default for a pthread stack size is 512kB. + // Libc-594.1.4/pthreads/pthread.c's pthread_attr_init uses + // DEFAULT_STACK_SIZE for this purpose. + // + // 512kB isn't quite generous enough for some deeply recursive threads that + // otherwise request the default stack size by specifying 0. Here, adopt + // glibc's behavior as on Linux, which is to use the current stack size + // limit (ulimit -s) as the default stack size. See + // glibc-2.11.1/nptl/nptl-init.c's __pthread_initialize_minimal_internal. To + // avoid setting the limit below the Mac OS X default or the minimum usable + // stack size, these values are also considered. If any of these values + // can't be determined, or if stack size is unlimited (ulimit -s unlimited), + // stack_size is left at 0 to get the system default. + // + // Mac OS X normally only applies ulimit -s to the main thread stack. On + // contemporary OS X and Linux systems alike, this value is generally 8MB + // or in that neighborhood. + size_t default_stack_size = 0; + struct rlimit stack_rlimit; + if (pthread_attr_getstacksize(&attributes, &default_stack_size) == 0 && + getrlimit(RLIMIT_STACK, &stack_rlimit) == 0 && + stack_rlimit.rlim_cur != RLIM_INFINITY) { + default_stack_size = + std::max(std::max(default_stack_size, + static_cast<size_t>(PTHREAD_STACK_MIN)), + static_cast<size_t>(stack_rlimit.rlim_cur)); + } + return default_stack_size; +#endif +} + +void InitOnThread() { +} + +void TerminateOnThread() { +} + +} // namespace base diff --git a/chromium/base/threading/platform_thread_posix.cc b/chromium/base/threading/platform_thread_posix.cc new file mode 100644 index 00000000000..43a42eca98a --- /dev/null +++ b/chromium/base/threading/platform_thread_posix.cc @@ -0,0 +1,231 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/platform_thread.h" + +#include <errno.h> +#include <sched.h> + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/safe_strerror_posix.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/thread_id_name_manager.h" +#include "base/threading/thread_restrictions.h" +#include "base/tracked_objects.h" + +#if defined(OS_MACOSX) +#include <sys/resource.h> +#include <algorithm> +#endif + +#if defined(OS_LINUX) +#include <sys/prctl.h> +#include <sys/resource.h> +#include <sys/syscall.h> +#include <sys/time.h> +#include <unistd.h> +#endif + +namespace base { + +void InitThreading(); +void InitOnThread(); +void TerminateOnThread(); +size_t GetDefaultThreadStackSize(const pthread_attr_t& attributes); + +namespace { + +struct ThreadParams { + ThreadParams() + : delegate(NULL), + joinable(false), + priority(kThreadPriority_Normal), + handle(NULL), + handle_set(false, false) { + } + + PlatformThread::Delegate* delegate; + bool joinable; + ThreadPriority priority; + PlatformThreadHandle* handle; + WaitableEvent handle_set; +}; + +void* ThreadFunc(void* params) { + base::InitOnThread(); + ThreadParams* thread_params = static_cast<ThreadParams*>(params); + + PlatformThread::Delegate* delegate = thread_params->delegate; + if (!thread_params->joinable) + base::ThreadRestrictions::SetSingletonAllowed(false); + + if (thread_params->priority != kThreadPriority_Normal) { + PlatformThread::SetThreadPriority(PlatformThread::CurrentHandle(), + thread_params->priority); + } + + // Stash the id in the handle so the calling thread has a complete + // handle, and unblock the parent thread. + *(thread_params->handle) = PlatformThreadHandle(pthread_self(), + PlatformThread::CurrentId()); + thread_params->handle_set.Signal(); + + ThreadIdNameManager::GetInstance()->RegisterThread( + PlatformThread::CurrentHandle().platform_handle(), + PlatformThread::CurrentId()); + + delegate->ThreadMain(); + + ThreadIdNameManager::GetInstance()->RemoveName( + PlatformThread::CurrentHandle().platform_handle(), + PlatformThread::CurrentId()); + + base::TerminateOnThread(); + return NULL; +} + +bool CreateThread(size_t stack_size, bool joinable, + PlatformThread::Delegate* delegate, + PlatformThreadHandle* thread_handle, + ThreadPriority priority) { + base::InitThreading(); + + bool success = false; + pthread_attr_t attributes; + pthread_attr_init(&attributes); + + // Pthreads are joinable by default, so only specify the detached + // attribute if the thread should be non-joinable. + if (!joinable) { + pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED); + } + + // Get a better default if available. + if (stack_size == 0) + stack_size = base::GetDefaultThreadStackSize(attributes); + + if (stack_size > 0) + pthread_attr_setstacksize(&attributes, stack_size); + + ThreadParams params; + params.delegate = delegate; + params.joinable = joinable; + params.priority = priority; + params.handle = thread_handle; + + pthread_t handle = 0; + int err = pthread_create(&handle, + &attributes, + ThreadFunc, + ¶ms); + success = !err; + if (!success) { + errno = err; + PLOG(ERROR) << "pthread_create"; + } + + pthread_attr_destroy(&attributes); + + // Don't let this call complete until the thread id + // is set in the handle. + if (success) + params.handle_set.Wait(); + CHECK_EQ(handle, thread_handle->platform_handle()); + + return success; +} + +} // namespace + +// static +PlatformThreadId PlatformThread::CurrentId() { + // Pthreads doesn't have the concept of a thread ID, so we have to reach down + // into the kernel. +#if defined(OS_MACOSX) + return pthread_mach_thread_np(pthread_self()); +#elif defined(OS_LINUX) + return syscall(__NR_gettid); +#elif defined(OS_ANDROID) + return gettid(); +#elif defined(OS_SOLARIS) + return pthread_self(); +#elif defined(OS_NACL) && defined(__GLIBC__) + return pthread_self(); +#elif defined(OS_NACL) && !defined(__GLIBC__) + // Pointers are 32-bits in NaCl. + return reinterpret_cast<int32>(pthread_self()); +#elif defined(OS_POSIX) + return reinterpret_cast<int64>(pthread_self()); +#endif +} + +//static +PlatformThreadHandle PlatformThread::CurrentHandle() { + return PlatformThreadHandle(pthread_self(), CurrentId()); +} + +// static +void PlatformThread::YieldCurrentThread() { + sched_yield(); +} + +// static +void PlatformThread::Sleep(TimeDelta duration) { + struct timespec sleep_time, remaining; + + // Break the duration into seconds and nanoseconds. + // NOTE: TimeDelta's microseconds are int64s while timespec's + // nanoseconds are longs, so this unpacking must prevent overflow. + sleep_time.tv_sec = duration.InSeconds(); + duration -= TimeDelta::FromSeconds(sleep_time.tv_sec); + sleep_time.tv_nsec = duration.InMicroseconds() * 1000; // nanoseconds + + while (nanosleep(&sleep_time, &remaining) == -1 && errno == EINTR) + sleep_time = remaining; +} + +// static +const char* PlatformThread::GetName() { + return ThreadIdNameManager::GetInstance()->GetName(CurrentId()); +} + +// static +bool PlatformThread::Create(size_t stack_size, Delegate* delegate, + PlatformThreadHandle* thread_handle) { + base::ThreadRestrictions::ScopedAllowWait allow_wait; + return CreateThread(stack_size, true /* joinable thread */, + delegate, thread_handle, kThreadPriority_Normal); +} + +// static +bool PlatformThread::CreateWithPriority(size_t stack_size, Delegate* delegate, + PlatformThreadHandle* thread_handle, + ThreadPriority priority) { + base::ThreadRestrictions::ScopedAllowWait allow_wait; + return CreateThread(stack_size, true, // joinable thread + delegate, thread_handle, priority); +} + +// static +bool PlatformThread::CreateNonJoinable(size_t stack_size, Delegate* delegate) { + PlatformThreadHandle unused; + + base::ThreadRestrictions::ScopedAllowWait allow_wait; + bool result = CreateThread(stack_size, false /* non-joinable thread */, + delegate, &unused, kThreadPriority_Normal); + return result; +} + +// static +void PlatformThread::Join(PlatformThreadHandle thread_handle) { + // Joining another thread may block the current thread for a long time, since + // the thread referred to by |thread_handle| may still be running long-lived / + // blocking tasks. + base::ThreadRestrictions::AssertIOAllowed(); + pthread_join(thread_handle.handle_, NULL); +} + +} // namespace base diff --git a/chromium/base/threading/platform_thread_unittest.cc b/chromium/base/threading/platform_thread_unittest.cc new file mode 100644 index 00000000000..59f29dadc3f --- /dev/null +++ b/chromium/base/threading/platform_thread_unittest.cc @@ -0,0 +1,121 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/compiler_specific.h" +#include "base/threading/platform_thread.h" + +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// Trivial tests that thread runs and doesn't crash on create and join --------- + +class TrivialThread : public PlatformThread::Delegate { + public: + TrivialThread() : did_run_(false) {} + + virtual void ThreadMain() OVERRIDE { + did_run_ = true; + } + + bool did_run() const { return did_run_; } + + private: + bool did_run_; + + DISALLOW_COPY_AND_ASSIGN(TrivialThread); +}; + +TEST(PlatformThreadTest, Trivial) { + TrivialThread thread; + PlatformThreadHandle handle; + + ASSERT_FALSE(thread.did_run()); + ASSERT_TRUE(PlatformThread::Create(0, &thread, &handle)); + PlatformThread::Join(handle); + ASSERT_TRUE(thread.did_run()); +} + +TEST(PlatformThreadTest, TrivialTimesTen) { + TrivialThread thread[10]; + PlatformThreadHandle handle[arraysize(thread)]; + + for (size_t n = 0; n < arraysize(thread); n++) + ASSERT_FALSE(thread[n].did_run()); + for (size_t n = 0; n < arraysize(thread); n++) + ASSERT_TRUE(PlatformThread::Create(0, &thread[n], &handle[n])); + for (size_t n = 0; n < arraysize(thread); n++) + PlatformThread::Join(handle[n]); + for (size_t n = 0; n < arraysize(thread); n++) + ASSERT_TRUE(thread[n].did_run()); +} + +// Tests of basic thread functions --------------------------------------------- + +class FunctionTestThread : public TrivialThread { + public: + FunctionTestThread() : thread_id_(0) {} + + virtual void ThreadMain() OVERRIDE { + thread_id_ = PlatformThread::CurrentId(); + PlatformThread::YieldCurrentThread(); + PlatformThread::Sleep(TimeDelta::FromMilliseconds(50)); + + // Make sure that the thread ID is the same across calls. + EXPECT_EQ(thread_id_, PlatformThread::CurrentId()); + + TrivialThread::ThreadMain(); + } + + PlatformThreadId thread_id() const { return thread_id_; } + + private: + PlatformThreadId thread_id_; + + DISALLOW_COPY_AND_ASSIGN(FunctionTestThread); +}; + +TEST(PlatformThreadTest, Function) { + PlatformThreadId main_thread_id = PlatformThread::CurrentId(); + + FunctionTestThread thread; + PlatformThreadHandle handle; + + ASSERT_FALSE(thread.did_run()); + ASSERT_TRUE(PlatformThread::Create(0, &thread, &handle)); + PlatformThread::Join(handle); + ASSERT_TRUE(thread.did_run()); + EXPECT_NE(thread.thread_id(), main_thread_id); + + // Make sure that the thread ID is the same across calls. + EXPECT_EQ(main_thread_id, PlatformThread::CurrentId()); +} + +TEST(PlatformThreadTest, FunctionTimesTen) { + PlatformThreadId main_thread_id = PlatformThread::CurrentId(); + + FunctionTestThread thread[10]; + PlatformThreadHandle handle[arraysize(thread)]; + + for (size_t n = 0; n < arraysize(thread); n++) + ASSERT_FALSE(thread[n].did_run()); + for (size_t n = 0; n < arraysize(thread); n++) + ASSERT_TRUE(PlatformThread::Create(0, &thread[n], &handle[n])); + for (size_t n = 0; n < arraysize(thread); n++) + PlatformThread::Join(handle[n]); + for (size_t n = 0; n < arraysize(thread); n++) { + ASSERT_TRUE(thread[n].did_run()); + EXPECT_NE(thread[n].thread_id(), main_thread_id); + + // Make sure no two threads get the same ID. + for (size_t i = 0; i < n; ++i) { + EXPECT_NE(thread[i].thread_id(), thread[n].thread_id()); + } + } + + // Make sure that the thread ID is the same across calls. + EXPECT_EQ(main_thread_id, PlatformThread::CurrentId()); +} + +} // namespace base diff --git a/chromium/base/threading/platform_thread_win.cc b/chromium/base/threading/platform_thread_win.cc new file mode 100644 index 00000000000..cf1e0f60ea1 --- /dev/null +++ b/chromium/base/threading/platform_thread_win.cc @@ -0,0 +1,234 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/platform_thread.h" + +#include "base/debug/alias.h" +#include "base/debug/profiler.h" +#include "base/logging.h" +#include "base/threading/thread_id_name_manager.h" +#include "base/threading/thread_restrictions.h" +#include "base/tracked_objects.h" + +#include "base/win/windows_version.h" + +namespace base { + +namespace { + +// The information on how to set the thread name comes from +// a MSDN article: http://msdn2.microsoft.com/en-us/library/xcb2z8hs.aspx +const DWORD kVCThreadNameException = 0x406D1388; + +typedef struct tagTHREADNAME_INFO { + DWORD dwType; // Must be 0x1000. + LPCSTR szName; // Pointer to name (in user addr space). + DWORD dwThreadID; // Thread ID (-1=caller thread). + DWORD dwFlags; // Reserved for future use, must be zero. +} THREADNAME_INFO; + +// This function has try handling, so it is separated out of its caller. +void SetNameInternal(PlatformThreadId thread_id, const char* name) { + THREADNAME_INFO info; + info.dwType = 0x1000; + info.szName = name; + info.dwThreadID = thread_id; + info.dwFlags = 0; + + __try { + RaiseException(kVCThreadNameException, 0, sizeof(info)/sizeof(DWORD), + reinterpret_cast<DWORD_PTR*>(&info)); + } __except(EXCEPTION_CONTINUE_EXECUTION) { + } +} + +struct ThreadParams { + PlatformThread::Delegate* delegate; + bool joinable; +}; + +DWORD __stdcall ThreadFunc(void* params) { + ThreadParams* thread_params = static_cast<ThreadParams*>(params); + PlatformThread::Delegate* delegate = thread_params->delegate; + if (!thread_params->joinable) + base::ThreadRestrictions::SetSingletonAllowed(false); + + /* Retrieve a copy of the thread handle to use as the key in the + * thread name mapping. */ + PlatformThreadHandle::Handle platform_handle; + DuplicateHandle( + GetCurrentProcess(), + GetCurrentThread(), + GetCurrentProcess(), + &platform_handle, + 0, + FALSE, + DUPLICATE_SAME_ACCESS); + + ThreadIdNameManager::GetInstance()->RegisterThread( + platform_handle, + PlatformThread::CurrentId()); + + delete thread_params; + delegate->ThreadMain(); + + ThreadIdNameManager::GetInstance()->RemoveName( + platform_handle, + PlatformThread::CurrentId()); + return NULL; +} + +// CreateThreadInternal() matches PlatformThread::Create(), except that +// |out_thread_handle| may be NULL, in which case a non-joinable thread is +// created. +bool CreateThreadInternal(size_t stack_size, + PlatformThread::Delegate* delegate, + PlatformThreadHandle* out_thread_handle) { + unsigned int flags = 0; + if (stack_size > 0 && base::win::GetVersion() >= base::win::VERSION_XP) { + flags = STACK_SIZE_PARAM_IS_A_RESERVATION; + } else { + stack_size = 0; + } + + ThreadParams* params = new ThreadParams; + params->delegate = delegate; + params->joinable = out_thread_handle != NULL; + + // Using CreateThread here vs _beginthreadex makes thread creation a bit + // faster and doesn't require the loader lock to be available. Our code will + // have to work running on CreateThread() threads anyway, since we run code + // on the Windows thread pool, etc. For some background on the difference: + // http://www.microsoft.com/msj/1099/win32/win321099.aspx + void* thread_handle = CreateThread( + NULL, stack_size, ThreadFunc, params, flags, NULL); + if (!thread_handle) { + delete params; + return false; + } + + if (out_thread_handle) + *out_thread_handle = PlatformThreadHandle(thread_handle); + else + CloseHandle(thread_handle); + return true; +} + +} // namespace + +// static +PlatformThreadId PlatformThread::CurrentId() { + return GetCurrentThreadId(); +} + +// static +PlatformThreadHandle PlatformThread::CurrentHandle() { + NOTIMPLEMENTED(); // See OpenThread() + return PlatformThreadHandle(); +} + +// static +void PlatformThread::YieldCurrentThread() { + ::Sleep(0); +} + +// static +void PlatformThread::Sleep(TimeDelta duration) { + ::Sleep(duration.InMillisecondsRoundedUp()); +} + +// static +void PlatformThread::SetName(const char* name) { + ThreadIdNameManager::GetInstance()->SetName(CurrentId(), name); + + // On Windows only, we don't need to tell the profiler about the "BrokerEvent" + // thread, as it exists only in the chrome.exe image, and never spawns or runs + // tasks (items which could be profiled). This test avoids the notification, + // which would also (as a side effect) initialize the profiler in this unused + // context, including setting up thread local storage, etc. The performance + // impact is not terrible, but there is no reason to do initialize it. + if (0 != strcmp(name, "BrokerEvent")) + tracked_objects::ThreadData::InitializeThreadContext(name); + + // The debugger needs to be around to catch the name in the exception. If + // there isn't a debugger, we are just needlessly throwing an exception. + // If this image file is instrumented, we raise the exception anyway + // to provide the profiler with human-readable thread names. + if (!::IsDebuggerPresent() && !base::debug::IsBinaryInstrumented()) + return; + + SetNameInternal(CurrentId(), name); +} + +// static +const char* PlatformThread::GetName() { + return ThreadIdNameManager::GetInstance()->GetName(CurrentId()); +} + +// static +bool PlatformThread::Create(size_t stack_size, Delegate* delegate, + PlatformThreadHandle* thread_handle) { + DCHECK(thread_handle); + return CreateThreadInternal(stack_size, delegate, thread_handle); +} + +// static +bool PlatformThread::CreateWithPriority(size_t stack_size, Delegate* delegate, + PlatformThreadHandle* thread_handle, + ThreadPriority priority) { + bool result = Create(stack_size, delegate, thread_handle); + if (result) + SetThreadPriority(*thread_handle, priority); + return result; +} + +// static +bool PlatformThread::CreateNonJoinable(size_t stack_size, Delegate* delegate) { + return CreateThreadInternal(stack_size, delegate, NULL); +} + +// static +void PlatformThread::Join(PlatformThreadHandle thread_handle) { + DCHECK(thread_handle.handle_); + // TODO(willchan): Enable this check once I can get it to work for Windows + // shutdown. + // Joining another thread may block the current thread for a long time, since + // the thread referred to by |thread_handle| may still be running long-lived / + // blocking tasks. +#if 0 + base::ThreadRestrictions::AssertIOAllowed(); +#endif + + // Wait for the thread to exit. It should already have terminated but make + // sure this assumption is valid. + DWORD result = WaitForSingleObject(thread_handle.handle_, INFINITE); + if (result != WAIT_OBJECT_0) { + // Debug info for bug 127931. + DWORD error = GetLastError(); + debug::Alias(&error); + debug::Alias(&result); + debug::Alias(&thread_handle.handle_); + CHECK(false); + } + + CloseHandle(thread_handle.handle_); +} + +// static +void PlatformThread::SetThreadPriority(PlatformThreadHandle handle, + ThreadPriority priority) { + switch (priority) { + case kThreadPriority_Normal: + ::SetThreadPriority(handle.handle_, THREAD_PRIORITY_NORMAL); + break; + case kThreadPriority_RealtimeAudio: + ::SetThreadPriority(handle.handle_, THREAD_PRIORITY_TIME_CRITICAL); + break; + default: + NOTREACHED() << "Unknown priority."; + break; + } +} + +} // namespace base diff --git a/chromium/base/threading/post_task_and_reply_impl.cc b/chromium/base/threading/post_task_and_reply_impl.cc new file mode 100644 index 00000000000..f464c6acad2 --- /dev/null +++ b/chromium/base/threading/post_task_and_reply_impl.cc @@ -0,0 +1,90 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/post_task_and_reply_impl.h" + +#include "base/bind.h" +#include "base/location.h" +#include "base/message_loop/message_loop_proxy.h" + +namespace base { + +namespace { + +// This relay class remembers the MessageLoop that it was created on, and +// ensures that both the |task| and |reply| Closures are deleted on this same +// thread. Also, |task| is guaranteed to be deleted before |reply| is run or +// deleted. +// +// If this is not possible because the originating MessageLoop is no longer +// available, the the |task| and |reply| Closures are leaked. Leaking is +// considered preferable to having a thread-safetey violations caused by +// invoking the Closure destructor on the wrong thread. +class PostTaskAndReplyRelay { + public: + PostTaskAndReplyRelay(const tracked_objects::Location& from_here, + const Closure& task, const Closure& reply) + : from_here_(from_here), + origin_loop_(MessageLoopProxy::current()) { + task_ = task; + reply_ = reply; + } + + ~PostTaskAndReplyRelay() { + DCHECK(origin_loop_->BelongsToCurrentThread()); + task_.Reset(); + reply_.Reset(); + } + + void Run() { + task_.Run(); + origin_loop_->PostTask( + from_here_, + Bind(&PostTaskAndReplyRelay::RunReplyAndSelfDestruct, + base::Unretained(this))); + } + + private: + void RunReplyAndSelfDestruct() { + DCHECK(origin_loop_->BelongsToCurrentThread()); + + // Force |task_| to be released before |reply_| is to ensure that no one + // accidentally depends on |task_| keeping one of its arguments alive while + // |reply_| is executing. + task_.Reset(); + + reply_.Run(); + + // Cue mission impossible theme. + delete this; + } + + tracked_objects::Location from_here_; + scoped_refptr<MessageLoopProxy> origin_loop_; + Closure reply_; + Closure task_; +}; + +} // namespace + +namespace internal { + +bool PostTaskAndReplyImpl::PostTaskAndReply( + const tracked_objects::Location& from_here, + const Closure& task, + const Closure& reply) { + PostTaskAndReplyRelay* relay = + new PostTaskAndReplyRelay(from_here, task, reply); + if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::Run, + Unretained(relay)))) { + delete relay; + return false; + } + + return true; +} + +} // namespace internal + +} // namespace base diff --git a/chromium/base/threading/post_task_and_reply_impl.h b/chromium/base/threading/post_task_and_reply_impl.h new file mode 100644 index 00000000000..076a46d69e8 --- /dev/null +++ b/chromium/base/threading/post_task_and_reply_impl.h @@ -0,0 +1,42 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// This file contains the implementation shared by +// MessageLoopProxy::PostTaskAndReply and WorkerPool::PostTaskAndReply. + +#ifndef BASE_THREADING_POST_TASK_AND_REPLY_IMPL_H_ +#define BASE_THREADING_POST_TASK_AND_REPLY_IMPL_H_ + +#include "base/callback_forward.h" +#include "base/location.h" + +namespace base { +namespace internal { + +// Inherit from this in a class that implements PostTask appropriately +// for sending to a destination thread. +// +// Note that 'reply' will always get posted back to your current +// MessageLoop. +// +// If you're looking for a concrete implementation of +// PostTaskAndReply, you probably want base::MessageLoopProxy, or you +// may want base::WorkerPool. +class PostTaskAndReplyImpl { + public: + // Implementation for MessageLoopProxy::PostTaskAndReply and + // WorkerPool::PostTaskAndReply. + bool PostTaskAndReply(const tracked_objects::Location& from_here, + const Closure& task, + const Closure& reply); + + private: + virtual bool PostTask(const tracked_objects::Location& from_here, + const Closure& task) = 0; +}; + +} // namespace internal +} // namespace base + +#endif // BASE_THREADING_POST_TASK_AND_REPLY_IMPL_H_ diff --git a/chromium/base/threading/sequenced_worker_pool.cc b/chromium/base/threading/sequenced_worker_pool.cc new file mode 100644 index 00000000000..d9921689fe8 --- /dev/null +++ b/chromium/base/threading/sequenced_worker_pool.cc @@ -0,0 +1,1287 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/sequenced_worker_pool.h" + +#include <list> +#include <map> +#include <set> +#include <utility> +#include <vector> + +#include "base/atomic_sequence_num.h" +#include "base/callback.h" +#include "base/compiler_specific.h" +#include "base/critical_closure.h" +#include "base/debug/trace_event.h" +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/memory/linked_ptr.h" +#include "base/message_loop/message_loop_proxy.h" +#include "base/stl_util.h" +#include "base/strings/stringprintf.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" +#include "base/threading/simple_thread.h" +#include "base/threading/thread_local.h" +#include "base/threading/thread_restrictions.h" +#include "base/time/time.h" +#include "base/tracked_objects.h" + +#if defined(OS_MACOSX) +#include "base/mac/scoped_nsautorelease_pool.h" +#endif + +#if !defined(OS_NACL) +#include "base/metrics/histogram.h" +#endif + +namespace base { + +namespace { + +struct SequencedTask : public TrackingInfo { + SequencedTask() + : sequence_token_id(0), + trace_id(0), + sequence_task_number(0), + shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} + + explicit SequencedTask(const tracked_objects::Location& from_here) + : base::TrackingInfo(from_here, TimeTicks()), + sequence_token_id(0), + trace_id(0), + sequence_task_number(0), + shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} + + ~SequencedTask() {} + + int sequence_token_id; + int trace_id; + int64 sequence_task_number; + SequencedWorkerPool::WorkerShutdown shutdown_behavior; + tracked_objects::Location posted_from; + Closure task; + + // Non-delayed tasks and delayed tasks are managed together by time-to-run + // order. We calculate the time by adding the posted time and the given delay. + TimeTicks time_to_run; +}; + +struct SequencedTaskLessThan { + public: + bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { + if (lhs.time_to_run < rhs.time_to_run) + return true; + + if (lhs.time_to_run > rhs.time_to_run) + return false; + + // If the time happen to match, then we use the sequence number to decide. + return lhs.sequence_task_number < rhs.sequence_task_number; + } +}; + +// SequencedWorkerPoolTaskRunner --------------------------------------------- +// A TaskRunner which posts tasks to a SequencedWorkerPool with a +// fixed ShutdownBehavior. +// +// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). +class SequencedWorkerPoolTaskRunner : public TaskRunner { + public: + SequencedWorkerPoolTaskRunner( + const scoped_refptr<SequencedWorkerPool>& pool, + SequencedWorkerPool::WorkerShutdown shutdown_behavior); + + // TaskRunner implementation + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) OVERRIDE; + virtual bool RunsTasksOnCurrentThread() const OVERRIDE; + + private: + virtual ~SequencedWorkerPoolTaskRunner(); + + const scoped_refptr<SequencedWorkerPool> pool_; + + const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; + + DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); +}; + +SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( + const scoped_refptr<SequencedWorkerPool>& pool, + SequencedWorkerPool::WorkerShutdown shutdown_behavior) + : pool_(pool), + shutdown_behavior_(shutdown_behavior) { +} + +SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { +} + +bool SequencedWorkerPoolTaskRunner::PostDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + if (delay == TimeDelta()) { + return pool_->PostWorkerTaskWithShutdownBehavior( + from_here, task, shutdown_behavior_); + } + return pool_->PostDelayedWorkerTask(from_here, task, delay); +} + +bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { + return pool_->RunsTasksOnCurrentThread(); +} + +// SequencedWorkerPoolSequencedTaskRunner ------------------------------------ +// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a +// fixed sequence token. +// +// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). +class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { + public: + SequencedWorkerPoolSequencedTaskRunner( + const scoped_refptr<SequencedWorkerPool>& pool, + SequencedWorkerPool::SequenceToken token, + SequencedWorkerPool::WorkerShutdown shutdown_behavior); + + // TaskRunner implementation + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) OVERRIDE; + virtual bool RunsTasksOnCurrentThread() const OVERRIDE; + + // SequencedTaskRunner implementation + virtual bool PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) OVERRIDE; + + private: + virtual ~SequencedWorkerPoolSequencedTaskRunner(); + + const scoped_refptr<SequencedWorkerPool> pool_; + + const SequencedWorkerPool::SequenceToken token_; + + const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; + + DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); +}; + +SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( + const scoped_refptr<SequencedWorkerPool>& pool, + SequencedWorkerPool::SequenceToken token, + SequencedWorkerPool::WorkerShutdown shutdown_behavior) + : pool_(pool), + token_(token), + shutdown_behavior_(shutdown_behavior) { +} + +SequencedWorkerPoolSequencedTaskRunner:: +~SequencedWorkerPoolSequencedTaskRunner() { +} + +bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + if (delay == TimeDelta()) { + return pool_->PostSequencedWorkerTaskWithShutdownBehavior( + token_, from_here, task, shutdown_behavior_); + } + return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); +} + +bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { + return pool_->IsRunningSequenceOnCurrentThread(token_); +} + +bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + // There's no way to run nested tasks, so simply forward to + // PostDelayedTask. + return PostDelayedTask(from_here, task, delay); +} + +// Create a process-wide unique ID to represent this task in trace events. This +// will be mangled with a Process ID hash to reduce the likelyhood of colliding +// with MessageLoop pointers on other processes. +uint64 GetTaskTraceID(const SequencedTask& task, + void* pool) { + return (static_cast<uint64>(task.trace_id) << 32) | + static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); +} + +base::LazyInstance<base::ThreadLocalPointer< + SequencedWorkerPool::SequenceToken> > g_lazy_tls_ptr = + LAZY_INSTANCE_INITIALIZER; + +} // namespace + +// Worker --------------------------------------------------------------------- + +class SequencedWorkerPool::Worker : public SimpleThread { + public: + // Hold a (cyclic) ref to |worker_pool|, since we want to keep it + // around as long as we are running. + Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, + int thread_number, + const std::string& thread_name_prefix); + virtual ~Worker(); + + // SimpleThread implementation. This actually runs the background thread. + virtual void Run() OVERRIDE; + + void set_running_task_info(SequenceToken token, + WorkerShutdown shutdown_behavior) { + running_sequence_ = token; + running_shutdown_behavior_ = shutdown_behavior; + } + + SequenceToken running_sequence() const { + return running_sequence_; + } + + WorkerShutdown running_shutdown_behavior() const { + return running_shutdown_behavior_; + } + + private: + scoped_refptr<SequencedWorkerPool> worker_pool_; + SequenceToken running_sequence_; + WorkerShutdown running_shutdown_behavior_; + + DISALLOW_COPY_AND_ASSIGN(Worker); +}; + +// Inner ---------------------------------------------------------------------- + +class SequencedWorkerPool::Inner { + public: + // Take a raw pointer to |worker| to avoid cycles (since we're owned + // by it). + Inner(SequencedWorkerPool* worker_pool, size_t max_threads, + const std::string& thread_name_prefix, + TestingObserver* observer); + + ~Inner(); + + SequenceToken GetSequenceToken(); + + SequenceToken GetNamedSequenceToken(const std::string& name); + + // This function accepts a name and an ID. If the name is null, the + // token ID is used. This allows us to implement the optional name lookup + // from a single function without having to enter the lock a separate time. + bool PostTask(const std::string* optional_token_name, + SequenceToken sequence_token, + WorkerShutdown shutdown_behavior, + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay); + + bool RunsTasksOnCurrentThread() const; + + bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; + + void CleanupForTesting(); + + void SignalHasWorkForTesting(); + + int GetWorkSignalCountForTesting() const; + + void Shutdown(int max_blocking_tasks_after_shutdown); + + bool IsShutdownInProgress(); + + // Runs the worker loop on the background thread. + void ThreadLoop(Worker* this_worker); + + private: + enum GetWorkStatus { + GET_WORK_FOUND, + GET_WORK_NOT_FOUND, + GET_WORK_WAIT, + }; + + enum CleanupState { + CLEANUP_REQUESTED, + CLEANUP_STARTING, + CLEANUP_RUNNING, + CLEANUP_FINISHING, + CLEANUP_DONE, + }; + + // Called from within the lock, this converts the given token name into a + // token ID, creating a new one if necessary. + int LockedGetNamedTokenID(const std::string& name); + + // Called from within the lock, this returns the next sequence task number. + int64 LockedGetNextSequenceTaskNumber(); + + // Called from within the lock, returns the shutdown behavior of the task + // running on the currently executing worker thread. If invoked from a thread + // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. + WorkerShutdown LockedCurrentThreadShutdownBehavior() const; + + // Gets new task. There are 3 cases depending on the return value: + // + // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should + // be run immediately. + // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, + // and |task| is not filled in. In this case, the caller should wait until + // a task is posted. + // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run + // immediately, and |task| is not filled in. Likewise, |wait_time| is + // filled in the time to wait until the next task to run. In this case, the + // caller should wait the time. + // + // In any case, the calling code should clear the given + // delete_these_outside_lock vector the next time the lock is released. + // See the implementation for a more detailed description. + GetWorkStatus GetWork(SequencedTask* task, + TimeDelta* wait_time, + std::vector<Closure>* delete_these_outside_lock); + + void HandleCleanup(); + + // Peforms init and cleanup around running the given task. WillRun... + // returns the value from PrepareToStartAdditionalThreadIfNecessary. + // The calling code should call FinishStartingAdditionalThread once the + // lock is released if the return values is nonzero. + int WillRunWorkerTask(const SequencedTask& task); + void DidRunWorkerTask(const SequencedTask& task); + + // Returns true if there are no threads currently running the given + // sequence token. + bool IsSequenceTokenRunnable(int sequence_token_id) const; + + // Checks if all threads are busy and the addition of one more could run an + // additional task waiting in the queue. This must be called from within + // the lock. + // + // If another thread is helpful, this will mark the thread as being in the + // process of starting and returns the index of the new thread which will be + // 0 or more. The caller should then call FinishStartingAdditionalThread to + // complete initialization once the lock is released. + // + // If another thread is not necessary, returne 0; + // + // See the implementedion for more. + int PrepareToStartAdditionalThreadIfHelpful(); + + // The second part of thread creation after + // PrepareToStartAdditionalThreadIfHelpful with the thread number it + // generated. This actually creates the thread and should be called outside + // the lock to avoid blocking important work starting a thread in the lock. + void FinishStartingAdditionalThread(int thread_number); + + // Signal |has_work_| and increment |has_work_signal_count_|. + void SignalHasWork(); + + // Checks whether there is work left that's blocking shutdown. Must be + // called inside the lock. + bool CanShutdown() const; + + SequencedWorkerPool* const worker_pool_; + + // The last sequence number used. Managed by GetSequenceToken, since this + // only does threadsafe increment operations, you do not need to hold the + // lock. This is class-static to make SequenceTokens issued by + // GetSequenceToken unique across SequencedWorkerPool instances. + static base::StaticAtomicSequenceNumber g_last_sequence_number_; + + // This lock protects |everything in this class|. Do not read or modify + // anything without holding this lock. Do not block while holding this + // lock. + mutable Lock lock_; + + // Condition variable that is waited on by worker threads until new + // tasks are posted or shutdown starts. + ConditionVariable has_work_cv_; + + // Condition variable that is waited on by non-worker threads (in + // Shutdown()) until CanShutdown() goes to true. + ConditionVariable can_shutdown_cv_; + + // The maximum number of worker threads we'll create. + const size_t max_threads_; + + const std::string thread_name_prefix_; + + // Associates all known sequence token names with their IDs. + std::map<std::string, int> named_sequence_tokens_; + + // Owning pointers to all threads we've created so far, indexed by + // ID. Since we lazily create threads, this may be less than + // max_threads_ and will be initially empty. + typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; + ThreadMap threads_; + + // Set to true when we're in the process of creating another thread. + // See PrepareToStartAdditionalThreadIfHelpful for more. + bool thread_being_created_; + + // Number of threads currently waiting for work. + size_t waiting_thread_count_; + + // Number of threads currently running tasks that have the BLOCK_SHUTDOWN + // or SKIP_ON_SHUTDOWN flag set. + size_t blocking_shutdown_thread_count_; + + // A set of all pending tasks in time-to-run order. These are tasks that are + // either waiting for a thread to run on, waiting for their time to run, + // or blocked on a previous task in their sequence. We have to iterate over + // the tasks by time-to-run order, so we use the set instead of the + // traditional priority_queue. + typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; + PendingTaskSet pending_tasks_; + + // The next sequence number for a new sequenced task. + int64 next_sequence_task_number_; + + // Number of tasks in the pending_tasks_ list that are marked as blocking + // shutdown. + size_t blocking_shutdown_pending_task_count_; + + // Lists all sequence tokens currently executing. + std::set<int> current_sequences_; + + // An ID for each posted task to distinguish the task from others in traces. + int trace_id_; + + // Set when Shutdown is called and no further tasks should be + // allowed, though we may still be running existing tasks. + bool shutdown_called_; + + // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() + // has been called. + int max_blocking_tasks_after_shutdown_; + + // State used to cleanup for testing, all guarded by lock_. + CleanupState cleanup_state_; + size_t cleanup_idlers_; + ConditionVariable cleanup_cv_; + + TestingObserver* const testing_observer_; + + DISALLOW_COPY_AND_ASSIGN(Inner); +}; + +// Worker definitions --------------------------------------------------------- + +SequencedWorkerPool::Worker::Worker( + const scoped_refptr<SequencedWorkerPool>& worker_pool, + int thread_number, + const std::string& prefix) + : SimpleThread( + prefix + StringPrintf("Worker%d", thread_number).c_str()), + worker_pool_(worker_pool), + running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { + Start(); +} + +SequencedWorkerPool::Worker::~Worker() { +} + +void SequencedWorkerPool::Worker::Run() { + // Store a pointer to the running sequence in thread local storage for + // static function access. + g_lazy_tls_ptr.Get().Set(&running_sequence_); + + // Just jump back to the Inner object to run the thread, since it has all the + // tracking information and queues. It might be more natural to implement + // using DelegateSimpleThread and have Inner implement the Delegate to avoid + // having these worker objects at all, but that method lacks the ability to + // send thread-specific information easily to the thread loop. + worker_pool_->inner_->ThreadLoop(this); + // Release our cyclic reference once we're done. + worker_pool_ = NULL; +} + +// Inner definitions --------------------------------------------------------- + +SequencedWorkerPool::Inner::Inner( + SequencedWorkerPool* worker_pool, + size_t max_threads, + const std::string& thread_name_prefix, + TestingObserver* observer) + : worker_pool_(worker_pool), + lock_(), + has_work_cv_(&lock_), + can_shutdown_cv_(&lock_), + max_threads_(max_threads), + thread_name_prefix_(thread_name_prefix), + thread_being_created_(false), + waiting_thread_count_(0), + blocking_shutdown_thread_count_(0), + next_sequence_task_number_(0), + blocking_shutdown_pending_task_count_(0), + trace_id_(0), + shutdown_called_(false), + max_blocking_tasks_after_shutdown_(0), + cleanup_state_(CLEANUP_DONE), + cleanup_idlers_(0), + cleanup_cv_(&lock_), + testing_observer_(observer) {} + +SequencedWorkerPool::Inner::~Inner() { + // You must call Shutdown() before destroying the pool. + DCHECK(shutdown_called_); + + // Need to explicitly join with the threads before they're destroyed or else + // they will be running when our object is half torn down. + for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) + it->second->Join(); + threads_.clear(); + + if (testing_observer_) + testing_observer_->OnDestruct(); +} + +SequencedWorkerPool::SequenceToken +SequencedWorkerPool::Inner::GetSequenceToken() { + // Need to add one because StaticAtomicSequenceNumber starts at zero, which + // is used as a sentinel value in SequenceTokens. + return SequenceToken(g_last_sequence_number_.GetNext() + 1); +} + +SequencedWorkerPool::SequenceToken +SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { + AutoLock lock(lock_); + return SequenceToken(LockedGetNamedTokenID(name)); +} + +bool SequencedWorkerPool::Inner::PostTask( + const std::string* optional_token_name, + SequenceToken sequence_token, + WorkerShutdown shutdown_behavior, + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); + SequencedTask sequenced(from_here); + sequenced.sequence_token_id = sequence_token.id_; + sequenced.shutdown_behavior = shutdown_behavior; + sequenced.posted_from = from_here; + sequenced.task = + shutdown_behavior == BLOCK_SHUTDOWN ? + base::MakeCriticalClosure(task) : task; + sequenced.time_to_run = TimeTicks::Now() + delay; + + int create_thread_id = 0; + { + AutoLock lock(lock_); + if (shutdown_called_) { + if (shutdown_behavior != BLOCK_SHUTDOWN || + LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { + return false; + } + if (max_blocking_tasks_after_shutdown_ <= 0) { + DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; + return false; + } + max_blocking_tasks_after_shutdown_ -= 1; + } + + // The trace_id is used for identifying the task in about:tracing. + sequenced.trace_id = trace_id_++; + + TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask", + TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this)))); + + sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); + + // Now that we have the lock, apply the named token rules. + if (optional_token_name) + sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); + + pending_tasks_.insert(sequenced); + if (shutdown_behavior == BLOCK_SHUTDOWN) + blocking_shutdown_pending_task_count_++; + + create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); + } + + // Actually start the additional thread or signal an existing one now that + // we're outside the lock. + if (create_thread_id) + FinishStartingAdditionalThread(create_thread_id); + else + SignalHasWork(); + + return true; +} + +bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { + AutoLock lock(lock_); + return ContainsKey(threads_, PlatformThread::CurrentId()); +} + +bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( + SequenceToken sequence_token) const { + AutoLock lock(lock_); + ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); + if (found == threads_.end()) + return false; + return sequence_token.Equals(found->second->running_sequence()); +} + +// See https://code.google.com/p/chromium/issues/detail?id=168415 +void SequencedWorkerPool::Inner::CleanupForTesting() { + DCHECK(!RunsTasksOnCurrentThread()); + base::ThreadRestrictions::ScopedAllowWait allow_wait; + AutoLock lock(lock_); + CHECK_EQ(CLEANUP_DONE, cleanup_state_); + if (shutdown_called_) + return; + if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) + return; + cleanup_state_ = CLEANUP_REQUESTED; + cleanup_idlers_ = 0; + has_work_cv_.Signal(); + while (cleanup_state_ != CLEANUP_DONE) + cleanup_cv_.Wait(); +} + +void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { + SignalHasWork(); +} + +void SequencedWorkerPool::Inner::Shutdown( + int max_new_blocking_tasks_after_shutdown) { + DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); + { + AutoLock lock(lock_); + // Cleanup and Shutdown should not be called concurrently. + CHECK_EQ(CLEANUP_DONE, cleanup_state_); + if (shutdown_called_) + return; + shutdown_called_ = true; + max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; + + // Tickle the threads. This will wake up a waiting one so it will know that + // it can exit, which in turn will wake up any other waiting ones. + SignalHasWork(); + + // There are no pending or running tasks blocking shutdown, we're done. + if (CanShutdown()) + return; + } + + // If we're here, then something is blocking shutdown. So wait for + // CanShutdown() to go to true. + + if (testing_observer_) + testing_observer_->WillWaitForShutdown(); + +#if !defined(OS_NACL) + TimeTicks shutdown_wait_begin = TimeTicks::Now(); +#endif + + { + base::ThreadRestrictions::ScopedAllowWait allow_wait; + AutoLock lock(lock_); + while (!CanShutdown()) + can_shutdown_cv_.Wait(); + } +#if !defined(OS_NACL) + UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", + TimeTicks::Now() - shutdown_wait_begin); +#endif +} + +bool SequencedWorkerPool::Inner::IsShutdownInProgress() { + AutoLock lock(lock_); + return shutdown_called_; +} + +void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { + { + AutoLock lock(lock_); + DCHECK(thread_being_created_); + thread_being_created_ = false; + std::pair<ThreadMap::iterator, bool> result = + threads_.insert( + std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); + DCHECK(result.second); + + while (true) { +#if defined(OS_MACOSX) + base::mac::ScopedNSAutoreleasePool autorelease_pool; +#endif + + HandleCleanup(); + + // See GetWork for what delete_these_outside_lock is doing. + SequencedTask task; + TimeDelta wait_time; + std::vector<Closure> delete_these_outside_lock; + GetWorkStatus status = + GetWork(&task, &wait_time, &delete_these_outside_lock); + if (status == GET_WORK_FOUND) { + TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", + TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); + TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", + "src_file", task.posted_from.file_name(), + "src_func", task.posted_from.function_name()); + int new_thread_id = WillRunWorkerTask(task); + { + AutoUnlock unlock(lock_); + // There may be more work available, so wake up another + // worker thread. (Technically not required, since we + // already get a signal for each new task, but it doesn't + // hurt.) + SignalHasWork(); + delete_these_outside_lock.clear(); + + // Complete thread creation outside the lock if necessary. + if (new_thread_id) + FinishStartingAdditionalThread(new_thread_id); + + this_worker->set_running_task_info( + SequenceToken(task.sequence_token_id), task.shutdown_behavior); + + tracked_objects::TrackedTime start_time = + tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); + + task.task.Run(); + + tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, + start_time, tracked_objects::ThreadData::NowForEndOfRun()); + + // Make sure our task is erased outside the lock for the + // same reason we do this with delete_these_oustide_lock. + // Also, do it before calling set_running_task_info() so + // that sequence-checking from within the task's destructor + // still works. + task.task = Closure(); + + this_worker->set_running_task_info( + SequenceToken(), CONTINUE_ON_SHUTDOWN); + } + DidRunWorkerTask(task); // Must be done inside the lock. + } else if (cleanup_state_ == CLEANUP_RUNNING) { + switch (status) { + case GET_WORK_WAIT: { + AutoUnlock unlock(lock_); + delete_these_outside_lock.clear(); + } + break; + case GET_WORK_NOT_FOUND: + CHECK(delete_these_outside_lock.empty()); + cleanup_state_ = CLEANUP_FINISHING; + cleanup_cv_.Broadcast(); + break; + default: + NOTREACHED(); + } + } else { + // When we're terminating and there's no more work, we can + // shut down, other workers can complete any pending or new tasks. + // We can get additional tasks posted after shutdown_called_ is set + // but only worker threads are allowed to post tasks at that time, and + // the workers responsible for posting those tasks will be available + // to run them. Also, there may be some tasks stuck behind running + // ones with the same sequence token, but additional threads won't + // help this case. + if (shutdown_called_ && + blocking_shutdown_pending_task_count_ == 0) + break; + waiting_thread_count_++; + + switch (status) { + case GET_WORK_NOT_FOUND: + has_work_cv_.Wait(); + break; + case GET_WORK_WAIT: + has_work_cv_.TimedWait(wait_time); + break; + default: + NOTREACHED(); + } + waiting_thread_count_--; + } + } + } // Release lock_. + + // We noticed we should exit. Wake up the next worker so it knows it should + // exit as well (because the Shutdown() code only signals once). + SignalHasWork(); + + // Possibly unblock shutdown. + can_shutdown_cv_.Signal(); +} + +void SequencedWorkerPool::Inner::HandleCleanup() { + lock_.AssertAcquired(); + if (cleanup_state_ == CLEANUP_DONE) + return; + if (cleanup_state_ == CLEANUP_REQUESTED) { + // We win, we get to do the cleanup as soon as the others wise up and idle. + cleanup_state_ = CLEANUP_STARTING; + while (thread_being_created_ || + cleanup_idlers_ != threads_.size() - 1) { + has_work_cv_.Signal(); + cleanup_cv_.Wait(); + } + cleanup_state_ = CLEANUP_RUNNING; + return; + } + if (cleanup_state_ == CLEANUP_STARTING) { + // Another worker thread is cleaning up, we idle here until thats done. + ++cleanup_idlers_; + cleanup_cv_.Broadcast(); + while (cleanup_state_ != CLEANUP_FINISHING) { + cleanup_cv_.Wait(); + } + --cleanup_idlers_; + cleanup_cv_.Broadcast(); + return; + } + if (cleanup_state_ == CLEANUP_FINISHING) { + // We wait for all idlers to wake up prior to being DONE. + while (cleanup_idlers_ != 0) { + cleanup_cv_.Broadcast(); + cleanup_cv_.Wait(); + } + if (cleanup_state_ == CLEANUP_FINISHING) { + cleanup_state_ = CLEANUP_DONE; + cleanup_cv_.Signal(); + } + return; + } +} + +int SequencedWorkerPool::Inner::LockedGetNamedTokenID( + const std::string& name) { + lock_.AssertAcquired(); + DCHECK(!name.empty()); + + std::map<std::string, int>::const_iterator found = + named_sequence_tokens_.find(name); + if (found != named_sequence_tokens_.end()) + return found->second; // Got an existing one. + + // Create a new one for this name. + SequenceToken result = GetSequenceToken(); + named_sequence_tokens_.insert(std::make_pair(name, result.id_)); + return result.id_; +} + +int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { + lock_.AssertAcquired(); + // We assume that we never create enough tasks to wrap around. + return next_sequence_task_number_++; +} + +SequencedWorkerPool::WorkerShutdown +SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { + lock_.AssertAcquired(); + ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); + if (found == threads_.end()) + return CONTINUE_ON_SHUTDOWN; + return found->second->running_shutdown_behavior(); +} + +SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( + SequencedTask* task, + TimeDelta* wait_time, + std::vector<Closure>* delete_these_outside_lock) { + lock_.AssertAcquired(); + +#if !defined(OS_NACL) + UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", + static_cast<int>(pending_tasks_.size())); +#endif + + // Find the next task with a sequence token that's not currently in use. + // If the token is in use, that means another thread is running something + // in that sequence, and we can't run it without going out-of-order. + // + // This algorithm is simple and fair, but inefficient in some cases. For + // example, say somebody schedules 1000 slow tasks with the same sequence + // number. We'll have to go through all those tasks each time we feel like + // there might be work to schedule. If this proves to be a problem, we + // should make this more efficient. + // + // One possible enhancement would be to keep a map from sequence ID to a + // list of pending but currently blocked SequencedTasks for that ID. + // When a worker finishes a task of one sequence token, it can pick up the + // next one from that token right away. + // + // This may lead to starvation if there are sufficient numbers of sequences + // in use. To alleviate this, we could add an incrementing priority counter + // to each SequencedTask. Then maintain a priority_queue of all runnable + // tasks, sorted by priority counter. When a sequenced task is completed + // we would pop the head element off of that tasks pending list and add it + // to the priority queue. Then we would run the first item in the priority + // queue. + + GetWorkStatus status = GET_WORK_NOT_FOUND; + int unrunnable_tasks = 0; + PendingTaskSet::iterator i = pending_tasks_.begin(); + // We assume that the loop below doesn't take too long and so we can just do + // a single call to TimeTicks::Now(). + const TimeTicks current_time = TimeTicks::Now(); + while (i != pending_tasks_.end()) { + if (!IsSequenceTokenRunnable(i->sequence_token_id)) { + unrunnable_tasks++; + ++i; + continue; + } + + if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { + // We're shutting down and the task we just found isn't blocking + // shutdown. Delete it and get more work. + // + // Note that we do not want to delete unrunnable tasks. Deleting a task + // can have side effects (like freeing some objects) and deleting a + // task that's supposed to run after one that's currently running could + // cause an obscure crash. + // + // We really want to delete these tasks outside the lock in case the + // closures are holding refs to objects that want to post work from + // their destructorss (which would deadlock). The closures are + // internally refcounted, so we just need to keep a copy of them alive + // until the lock is exited. The calling code can just clear() the + // vector they passed to us once the lock is exited to make this + // happen. + delete_these_outside_lock->push_back(i->task); + pending_tasks_.erase(i++); + continue; + } + + if (i->time_to_run > current_time) { + // The time to run has not come yet. + *wait_time = i->time_to_run - current_time; + status = GET_WORK_WAIT; + if (cleanup_state_ == CLEANUP_RUNNING) { + // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. + delete_these_outside_lock->push_back(i->task); + pending_tasks_.erase(i); + } + break; + } + + // Found a runnable task. + *task = *i; + pending_tasks_.erase(i); + if (task->shutdown_behavior == BLOCK_SHUTDOWN) { + blocking_shutdown_pending_task_count_--; + } + + status = GET_WORK_FOUND; + break; + } + + // Track the number of tasks we had to skip over to see if we should be + // making this more efficient. If this number ever becomes large or is + // frequently "some", we should consider the optimization above. +#if !defined(OS_NACL) + UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", + unrunnable_tasks); +#endif + return status; +} + +int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { + lock_.AssertAcquired(); + + // Mark the task's sequence number as in use. + if (task.sequence_token_id) + current_sequences_.insert(task.sequence_token_id); + + // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN + // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread + // completes. + if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) + blocking_shutdown_thread_count_++; + + // We just picked up a task. Since StartAdditionalThreadIfHelpful only + // creates a new thread if there is no free one, there is a race when posting + // tasks that many tasks could have been posted before a thread started + // running them, so only one thread would have been created. So we also check + // whether we should create more threads after removing our task from the + // queue, which also has the nice side effect of creating the workers from + // background threads rather than the main thread of the app. + // + // If another thread wasn't created, we want to wake up an existing thread + // if there is one waiting to pick up the next task. + // + // Note that we really need to do this *before* running the task, not + // after. Otherwise, if more than one task is posted, the creation of the + // second thread (since we only create one at a time) will be blocked by + // the execution of the first task, which could be arbitrarily long. + return PrepareToStartAdditionalThreadIfHelpful(); +} + +void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { + lock_.AssertAcquired(); + + if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { + DCHECK_GT(blocking_shutdown_thread_count_, 0u); + blocking_shutdown_thread_count_--; + } + + if (task.sequence_token_id) + current_sequences_.erase(task.sequence_token_id); +} + +bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( + int sequence_token_id) const { + lock_.AssertAcquired(); + return !sequence_token_id || + current_sequences_.find(sequence_token_id) == + current_sequences_.end(); +} + +int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { + lock_.AssertAcquired(); + // How thread creation works: + // + // We'de like to avoid creating threads with the lock held. However, we + // need to be sure that we have an accurate accounting of the threads for + // proper Joining and deltion on shutdown. + // + // We need to figure out if we need another thread with the lock held, which + // is what this function does. It then marks us as in the process of creating + // a thread. When we do shutdown, we wait until the thread_being_created_ + // flag is cleared, which ensures that the new thread is properly added to + // all the data structures and we can't leak it. Once shutdown starts, we'll + // refuse to create more threads or they would be leaked. + // + // Note that this creates a mostly benign race condition on shutdown that + // will cause fewer workers to be created than one would expect. It isn't + // much of an issue in real life, but affects some tests. Since we only spawn + // one worker at a time, the following sequence of events can happen: + // + // 1. Main thread posts a bunch of unrelated tasks that would normally be + // run on separate threads. + // 2. The first task post causes us to start a worker. Other tasks do not + // cause a worker to start since one is pending. + // 3. Main thread initiates shutdown. + // 4. No more threads are created since the shutdown_called_ flag is set. + // + // The result is that one may expect that max_threads_ workers to be created + // given the workload, but in reality fewer may be created because the + // sequence of thread creation on the background threads is racing with the + // shutdown call. + if (!shutdown_called_ && + !thread_being_created_ && + cleanup_state_ == CLEANUP_DONE && + threads_.size() < max_threads_ && + waiting_thread_count_ == 0) { + // We could use an additional thread if there's work to be done. + for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); + i != pending_tasks_.end(); ++i) { + if (IsSequenceTokenRunnable(i->sequence_token_id)) { + // Found a runnable task, mark the thread as being started. + thread_being_created_ = true; + return static_cast<int>(threads_.size() + 1); + } + } + } + return 0; +} + +void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( + int thread_number) { + // Called outside of the lock. + DCHECK(thread_number > 0); + + // The worker is assigned to the list when the thread actually starts, which + // will manage the memory of the pointer. + new Worker(worker_pool_, thread_number, thread_name_prefix_); +} + +void SequencedWorkerPool::Inner::SignalHasWork() { + has_work_cv_.Signal(); + if (testing_observer_) { + testing_observer_->OnHasWork(); + } +} + +bool SequencedWorkerPool::Inner::CanShutdown() const { + lock_.AssertAcquired(); + // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. + return !thread_being_created_ && + blocking_shutdown_thread_count_ == 0 && + blocking_shutdown_pending_task_count_ == 0; +} + +base::StaticAtomicSequenceNumber +SequencedWorkerPool::Inner::g_last_sequence_number_; + +// SequencedWorkerPool -------------------------------------------------------- + +// static +SequencedWorkerPool::SequenceToken +SequencedWorkerPool::GetSequenceTokenForCurrentThread() { + // Don't construct lazy instance on check. + if (g_lazy_tls_ptr == NULL) + return SequenceToken(); + + SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); + if (!token) + return SequenceToken(); + return *token; +} + +SequencedWorkerPool::SequencedWorkerPool( + size_t max_threads, + const std::string& thread_name_prefix) + : constructor_message_loop_(MessageLoopProxy::current()), + inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { +} + +SequencedWorkerPool::SequencedWorkerPool( + size_t max_threads, + const std::string& thread_name_prefix, + TestingObserver* observer) + : constructor_message_loop_(MessageLoopProxy::current()), + inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { +} + +SequencedWorkerPool::~SequencedWorkerPool() {} + +void SequencedWorkerPool::OnDestruct() const { + DCHECK(constructor_message_loop_.get()); + // Avoid deleting ourselves on a worker thread (which would + // deadlock). + if (RunsTasksOnCurrentThread()) { + constructor_message_loop_->DeleteSoon(FROM_HERE, this); + } else { + delete this; + } +} + +SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { + return inner_->GetSequenceToken(); +} + +SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( + const std::string& name) { + return inner_->GetNamedSequenceToken(name); +} + +scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( + SequenceToken token) { + return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); +} + +scoped_refptr<SequencedTaskRunner> +SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( + SequenceToken token, WorkerShutdown shutdown_behavior) { + return new SequencedWorkerPoolSequencedTaskRunner( + this, token, shutdown_behavior); +} + +scoped_refptr<TaskRunner> +SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( + WorkerShutdown shutdown_behavior) { + return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); +} + +bool SequencedWorkerPool::PostWorkerTask( + const tracked_objects::Location& from_here, + const Closure& task) { + return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, + from_here, task, TimeDelta()); +} + +bool SequencedWorkerPool::PostDelayedWorkerTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + WorkerShutdown shutdown_behavior = + delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; + return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, + from_here, task, delay); +} + +bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( + const tracked_objects::Location& from_here, + const Closure& task, + WorkerShutdown shutdown_behavior) { + return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, + from_here, task, TimeDelta()); +} + +bool SequencedWorkerPool::PostSequencedWorkerTask( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const Closure& task) { + return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, + from_here, task, TimeDelta()); +} + +bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + WorkerShutdown shutdown_behavior = + delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; + return inner_->PostTask(NULL, sequence_token, shutdown_behavior, + from_here, task, delay); +} + +bool SequencedWorkerPool::PostNamedSequencedWorkerTask( + const std::string& token_name, + const tracked_objects::Location& from_here, + const Closure& task) { + DCHECK(!token_name.empty()); + return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, + from_here, task, TimeDelta()); +} + +bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const Closure& task, + WorkerShutdown shutdown_behavior) { + return inner_->PostTask(NULL, sequence_token, shutdown_behavior, + from_here, task, TimeDelta()); +} + +bool SequencedWorkerPool::PostDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + return PostDelayedWorkerTask(from_here, task, delay); +} + +bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { + return inner_->RunsTasksOnCurrentThread(); +} + +bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( + SequenceToken sequence_token) const { + return inner_->IsRunningSequenceOnCurrentThread(sequence_token); +} + +void SequencedWorkerPool::FlushForTesting() { + inner_->CleanupForTesting(); +} + +void SequencedWorkerPool::SignalHasWorkForTesting() { + inner_->SignalHasWorkForTesting(); +} + +void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { + DCHECK(constructor_message_loop_->BelongsToCurrentThread()); + inner_->Shutdown(max_new_blocking_tasks_after_shutdown); +} + +bool SequencedWorkerPool::IsShutdownInProgress() { + return inner_->IsShutdownInProgress(); +} + +} // namespace base diff --git a/chromium/base/threading/sequenced_worker_pool.h b/chromium/base/threading/sequenced_worker_pool.h new file mode 100644 index 00000000000..1bd275b206d --- /dev/null +++ b/chromium/base/threading/sequenced_worker_pool.h @@ -0,0 +1,358 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ +#define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ + +#include <cstddef> +#include <string> + +#include "base/base_export.h" +#include "base/basictypes.h" +#include "base/callback_forward.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/task_runner.h" + +namespace tracked_objects { +class Location; +} // namespace tracked_objects + +namespace base { + +class MessageLoopProxy; + +template <class T> class DeleteHelper; + +class SequencedTaskRunner; + +// A worker thread pool that enforces ordering between sets of tasks. It also +// allows you to specify what should happen to your tasks on shutdown. +// +// To enforce ordering, get a unique sequence token from the pool and post all +// tasks you want to order with the token. All tasks with the same token are +// guaranteed to execute serially, though not necessarily on the same thread. +// This means that: +// +// - No two tasks with the same token will run at the same time. +// +// - Given two tasks T1 and T2 with the same token such that T2 will +// run after T1, then T2 will start after T1 is destroyed. +// +// - If T2 will run after T1, then all memory changes in T1 and T1's +// destruction will be visible to T2. +// +// Example: +// SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken(); +// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, +// FROM_HERE, base::Bind(...)); +// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, +// FROM_HERE, base::Bind(...)); +// +// You can make named sequence tokens to make it easier to share a token +// across different components. +// +// You can also post tasks to the pool without ordering using PostWorkerTask. +// These will be executed in an unspecified order. The order of execution +// between tasks with different sequence tokens is also unspecified. +// +// This class may be leaked on shutdown to facilitate fast shutdown. The +// expected usage, however, is to call Shutdown(), which correctly accounts +// for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN +// behavior. +// +// Implementation note: This does not use a base::WorkerPool since that does +// not enforce shutdown semantics or allow us to specify how many worker +// threads to run. For the typical use case of random background work, we don't +// necessarily want to be super aggressive about creating threads. +// +// Note that SequencedWorkerPool is RefCountedThreadSafe (inherited +// from TaskRunner). +class BASE_EXPORT SequencedWorkerPool : public TaskRunner { + public: + // Defines what should happen to a task posted to the worker pool on + // shutdown. + enum WorkerShutdown { + // Tasks posted with this mode which have not run at shutdown will be + // deleted rather than run, and any tasks with this mode running at + // shutdown will be ignored (the worker thread will not be joined). + // + // This option provides a nice way to post stuff you don't want blocking + // shutdown. For example, you might be doing a slow DNS lookup and if it's + // blocked on the OS, you may not want to stop shutdown, since the result + // doesn't really matter at that point. + // + // However, you need to be very careful what you do in your callback when + // you use this option. Since the thread will continue to run until the OS + // terminates the process, the app can be in the process of tearing down + // when you're running. This means any singletons or global objects you + // use may suddenly become invalid out from under you. For this reason, + // it's best to use this only for slow but simple operations like the DNS + // example. + CONTINUE_ON_SHUTDOWN, + + // Tasks posted with this mode that have not started executing at + // shutdown will be deleted rather than executed. However, any tasks that + // have already begun executing when shutdown is called will be allowed + // to continue, and will block shutdown until completion. + // + // Note: Because Shutdown() may block while these tasks are executing, + // care must be taken to ensure that they do not block on the thread that + // called Shutdown(), as this may lead to deadlock. + SKIP_ON_SHUTDOWN, + + // Tasks posted with this mode will block shutdown until they're + // executed. Since this can have significant performance implications, + // use sparingly. + // + // Generally, this should be used only for user data, for example, a task + // writing a preference file. + // + // If a task is posted during shutdown, it will not get run since the + // workers may already be stopped. In this case, the post operation will + // fail (return false) and the task will be deleted. + BLOCK_SHUTDOWN, + }; + + // Opaque identifier that defines sequencing of tasks posted to the worker + // pool. + class SequenceToken { + public: + SequenceToken() : id_(0) {} + ~SequenceToken() {} + + bool Equals(const SequenceToken& other) const { + return id_ == other.id_; + } + + // Returns false if current thread is executing an unsequenced task. + bool IsValid() const { + return id_ != 0; + } + + private: + friend class SequencedWorkerPool; + + explicit SequenceToken(int id) : id_(id) {} + + int id_; + }; + + // Allows tests to perform certain actions. + class TestingObserver { + public: + virtual ~TestingObserver() {} + virtual void OnHasWork() = 0; + virtual void WillWaitForShutdown() = 0; + virtual void OnDestruct() = 0; + }; + + // Gets the SequencedToken of the current thread. + // If current thread is not a SequencedWorkerPool worker thread or is running + // an unsequenced task, returns an invalid SequenceToken. + static SequenceToken GetSequenceTokenForCurrentThread(); + + // When constructing a SequencedWorkerPool, there must be a + // MessageLoop on the current thread unless you plan to deliberately + // leak it. + + // Pass the maximum number of threads (they will be lazily created as needed) + // and a prefix for the thread name to aid in debugging. + SequencedWorkerPool(size_t max_threads, + const std::string& thread_name_prefix); + + // Like above, but with |observer| for testing. Does not take + // ownership of |observer|. + SequencedWorkerPool(size_t max_threads, + const std::string& thread_name_prefix, + TestingObserver* observer); + + // Returns a unique token that can be used to sequence tasks posted to + // PostSequencedWorkerTask(). Valid tokens are always nonzero. + SequenceToken GetSequenceToken(); + + // Returns the sequence token associated with the given name. Calling this + // function multiple times with the same string will always produce the + // same sequence token. If the name has not been used before, a new token + // will be created. + SequenceToken GetNamedSequenceToken(const std::string& name); + + // Returns a SequencedTaskRunner wrapper which posts to this + // SequencedWorkerPool using the given sequence token. Tasks with nonzero + // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay + // are posted with BLOCK_SHUTDOWN behavior. + scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner( + SequenceToken token); + + // Returns a SequencedTaskRunner wrapper which posts to this + // SequencedWorkerPool using the given sequence token. Tasks with nonzero + // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay + // are posted with the given shutdown behavior. + scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior( + SequenceToken token, + WorkerShutdown shutdown_behavior); + + // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using + // the given shutdown behavior. Tasks with nonzero delay are posted with + // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the + // given shutdown behavior. + scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior( + WorkerShutdown shutdown_behavior); + + // Posts the given task for execution in the worker pool. Tasks posted with + // this function will execute in an unspecified order on a background thread. + // Returns true if the task was posted. If your tasks have ordering + // requirements, see PostSequencedWorkerTask(). + // + // This class will attempt to delete tasks that aren't run + // (non-block-shutdown semantics) but can't guarantee that this happens. If + // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there + // will be no workers available to delete these tasks. And there may be + // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN + // tasks. Deleting those tasks before the previous one has completed could + // cause nondeterministic crashes because the task could be keeping some + // objects alive which do work in their destructor, which could voilate the + // assumptions of the running task. + // + // The task will be guaranteed to run to completion before shutdown + // (BLOCK_SHUTDOWN semantics). + // + // Returns true if the task was posted successfully. This may fail during + // shutdown regardless of the specified ShutdownBehavior. + bool PostWorkerTask(const tracked_objects::Location& from_here, + const Closure& task); + + // Same as PostWorkerTask but allows a delay to be specified (although doing + // so changes the shutdown behavior). The task will be run after the given + // delay has elapsed. + // + // If the delay is nonzero, the task won't be guaranteed to run to completion + // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. + // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the + // task will be guaranteed to run to completion before shutdown + // (BLOCK_SHUTDOWN semantics). + bool PostDelayedWorkerTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay); + + // Same as PostWorkerTask but allows specification of the shutdown behavior. + bool PostWorkerTaskWithShutdownBehavior( + const tracked_objects::Location& from_here, + const Closure& task, + WorkerShutdown shutdown_behavior); + + // Like PostWorkerTask above, but provides sequencing semantics. This means + // that tasks posted with the same sequence token (see GetSequenceToken()) + // are guaranteed to execute in order. This is useful in cases where you're + // doing operations that may depend on previous ones, like appending to a + // file. + // + // The task will be guaranteed to run to completion before shutdown + // (BLOCK_SHUTDOWN semantics). + // + // Returns true if the task was posted successfully. This may fail during + // shutdown regardless of the specified ShutdownBehavior. + bool PostSequencedWorkerTask(SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const Closure& task); + + // Like PostSequencedWorkerTask above, but allows you to specify a named + // token, which saves an extra call to GetNamedSequenceToken. + bool PostNamedSequencedWorkerTask(const std::string& token_name, + const tracked_objects::Location& from_here, + const Closure& task); + + // Same as PostSequencedWorkerTask but allows a delay to be specified + // (although doing so changes the shutdown behavior). The task will be run + // after the given delay has elapsed. + // + // If the delay is nonzero, the task won't be guaranteed to run to completion + // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. + // If the delay is zero, this behaves exactly like PostSequencedWorkerTask, + // i.e. the task will be guaranteed to run to completion before shutdown + // (BLOCK_SHUTDOWN semantics). + bool PostDelayedSequencedWorkerTask( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay); + + // Same as PostSequencedWorkerTask but allows specification of the shutdown + // behavior. + bool PostSequencedWorkerTaskWithShutdownBehavior( + SequenceToken sequence_token, + const tracked_objects::Location& from_here, + const Closure& task, + WorkerShutdown shutdown_behavior); + + // TaskRunner implementation. Forwards to PostDelayedWorkerTask(). + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) OVERRIDE; + virtual bool RunsTasksOnCurrentThread() const OVERRIDE; + + // Returns true if the current thread is processing a task with the given + // sequence_token. + bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; + + // Blocks until all pending tasks are complete. This should only be called in + // unit tests when you want to validate something that should have happened. + // This will not flush delayed tasks; delayed tasks get deleted. + // + // Note that calling this will not prevent other threads from posting work to + // the queue while the calling thread is waiting on Flush(). In this case, + // Flush will return only when there's no more work in the queue. Normally, + // this doesn't come up since in a test, all the work is being posted from + // the main thread. + void FlushForTesting(); + + // Spuriously signal that there is work to be done. + void SignalHasWorkForTesting(); + + // Implements the worker pool shutdown. This should be called during app + // shutdown, and will discard/join with appropriate tasks before returning. + // After this call, subsequent calls to post tasks will fail. + // + // Must be called from the same thread this object was constructed on. + void Shutdown() { Shutdown(0); } + + // A variant that allows an arbitrary number of new blocking tasks to + // be posted during shutdown from within tasks that execute during shutdown. + // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if + // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once + // the limit is reached, subsequent calls to post task fail in all cases. + // + // Must be called from the same thread this object was constructed on. + void Shutdown(int max_new_blocking_tasks_after_shutdown); + + // Check if Shutdown was called for given threading pool. This method is used + // for aborting time consuming operation to avoid blocking shutdown. + // + // Can be called from any thread. + bool IsShutdownInProgress(); + + protected: + virtual ~SequencedWorkerPool(); + + virtual void OnDestruct() const OVERRIDE; + + private: + friend class RefCountedThreadSafe<SequencedWorkerPool>; + friend class DeleteHelper<SequencedWorkerPool>; + + class Inner; + class Worker; + + const scoped_refptr<MessageLoopProxy> constructor_message_loop_; + + // Avoid pulling in too many headers by putting (almost) everything + // into |inner_|. + const scoped_ptr<Inner> inner_; + + DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); +}; + +} // namespace base + +#endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ diff --git a/chromium/base/threading/sequenced_worker_pool_unittest.cc b/chromium/base/threading/sequenced_worker_pool_unittest.cc new file mode 100644 index 00000000000..a07fd479095 --- /dev/null +++ b/chromium/base/threading/sequenced_worker_pool_unittest.cc @@ -0,0 +1,936 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/sequenced_worker_pool.h" + +#include <algorithm> + +#include "base/bind.h" +#include "base/compiler_specific.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop/message_loop.h" +#include "base/message_loop/message_loop_proxy.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "base/test/sequenced_task_runner_test_template.h" +#include "base/test/sequenced_worker_pool_owner.h" +#include "base/test/task_runner_test_template.h" +#include "base/test/test_timeouts.h" +#include "base/threading/platform_thread.h" +#include "base/time/time.h" +#include "base/tracked_objects.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// IMPORTANT NOTE: +// +// Many of these tests have failure modes where they'll hang forever. These +// tests should not be flaky, and hanging indicates a type of failure. Do not +// mark as flaky if they're hanging, it's likely an actual bug. + +namespace { + +const size_t kNumWorkerThreads = 3; + +// Allows a number of threads to all be blocked on the same event, and +// provides a way to unblock a certain number of them. +class ThreadBlocker { + public: + ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {} + + void Block() { + { + base::AutoLock lock(lock_); + while (unblock_counter_ == 0) + cond_var_.Wait(); + unblock_counter_--; + } + cond_var_.Signal(); + } + + void Unblock(size_t count) { + { + base::AutoLock lock(lock_); + DCHECK(unblock_counter_ == 0); + unblock_counter_ = count; + } + cond_var_.Signal(); + } + + private: + base::Lock lock_; + base::ConditionVariable cond_var_; + + size_t unblock_counter_; +}; + +class TestTracker : public base::RefCountedThreadSafe<TestTracker> { + public: + TestTracker() + : lock_(), + cond_var_(&lock_), + started_events_(0) { + } + + // Each of these tasks appends the argument to the complete sequence vector + // so calling code can see what order they finished in. + void FastTask(int id) { + SignalWorkerDone(id); + } + + void SlowTask(int id) { + base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1)); + SignalWorkerDone(id); + } + + void BlockTask(int id, ThreadBlocker* blocker) { + // Note that this task has started and signal anybody waiting for that + // to happen. + { + base::AutoLock lock(lock_); + started_events_++; + } + cond_var_.Signal(); + + blocker->Block(); + SignalWorkerDone(id); + } + + void PostAdditionalTasks( + int id, SequencedWorkerPool* pool, + bool expected_return_value) { + Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); + EXPECT_EQ(expected_return_value, + pool->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, fast_task, + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); + EXPECT_EQ(expected_return_value, + pool->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, fast_task, + SequencedWorkerPool::SKIP_ON_SHUTDOWN)); + pool->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, fast_task, + SequencedWorkerPool::BLOCK_SHUTDOWN); + SignalWorkerDone(id); + } + + // Waits until the given number of tasks have started executing. + void WaitUntilTasksBlocked(size_t count) { + { + base::AutoLock lock(lock_); + while (started_events_ < count) + cond_var_.Wait(); + } + cond_var_.Signal(); + } + + // Blocks the current thread until at least the given number of tasks are in + // the completed vector, and then returns a copy. + std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { + std::vector<int> ret; + { + base::AutoLock lock(lock_); + while (complete_sequence_.size() < num_tasks) + cond_var_.Wait(); + ret = complete_sequence_; + } + cond_var_.Signal(); + return ret; + } + + size_t GetTasksCompletedCount() { + base::AutoLock lock(lock_); + return complete_sequence_.size(); + } + + void ClearCompleteSequence() { + base::AutoLock lock(lock_); + complete_sequence_.clear(); + started_events_ = 0; + } + + private: + friend class base::RefCountedThreadSafe<TestTracker>; + ~TestTracker() {} + + void SignalWorkerDone(int id) { + { + base::AutoLock lock(lock_); + complete_sequence_.push_back(id); + } + cond_var_.Signal(); + } + + // Protects the complete_sequence. + base::Lock lock_; + + base::ConditionVariable cond_var_; + + // Protected by lock_. + std::vector<int> complete_sequence_; + + // Counter of the number of "block" workers that have started. + size_t started_events_; +}; + +class SequencedWorkerPoolTest : public testing::Test { + public: + SequencedWorkerPoolTest() + : tracker_(new TestTracker) { + ResetPool(); + } + + virtual ~SequencedWorkerPoolTest() {} + + virtual void SetUp() OVERRIDE {} + + virtual void TearDown() OVERRIDE { + pool()->Shutdown(); + } + + const scoped_refptr<SequencedWorkerPool>& pool() { + return pool_owner_->pool(); + } + TestTracker* tracker() { return tracker_.get(); } + + // Destroys the SequencedWorkerPool instance, blocking until it is fully shut + // down, and creates a new instance. + void ResetPool() { + pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test")); + } + + void SetWillWaitForShutdownCallback(const Closure& callback) { + pool_owner_->SetWillWaitForShutdownCallback(callback); + } + + // Ensures that the given number of worker threads is created by adding + // tasks and waiting until they complete. Worker thread creation is + // serialized, can happen on background threads asynchronously, and doesn't + // happen any more at shutdown. This means that if a test posts a bunch of + // tasks and calls shutdown, fewer workers will be created than the test may + // expect. + // + // This function ensures that this condition can't happen so tests can make + // assumptions about the number of workers active. See the comment in + // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more + // details. + // + // It will post tasks to the queue with id -1. It also assumes this is the + // first thing called in a test since it will clear the complete_sequence_. + void EnsureAllWorkersCreated() { + // Create a bunch of threads, all waiting. This will cause that may + // workers to be created. + ThreadBlocker blocker; + for (size_t i = 0; i < kNumWorkerThreads; i++) { + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), -1, &blocker)); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Now wake them up and wait until they're done. + blocker.Unblock(kNumWorkerThreads); + tracker()->WaitUntilTasksComplete(kNumWorkerThreads); + + // Clean up the task IDs we added. + tracker()->ClearCompleteSequence(); + } + + int has_work_call_count() const { + return pool_owner_->has_work_call_count(); + } + + private: + MessageLoop message_loop_; + scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; + const scoped_refptr<TestTracker> tracker_; +}; + +// Checks that the given number of entries are in the tasks to complete of +// the given tracker, and then signals the given event the given number of +// times. This is used to wakt up blocked background threads before blocking +// on shutdown. +void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker, + size_t expected_tasks_to_complete, + ThreadBlocker* blocker, + size_t threads_to_awake) { + EXPECT_EQ( + expected_tasks_to_complete, + tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); + + blocker->Unblock(threads_to_awake); +} + +class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> { + public: + explicit DeletionHelper( + const scoped_refptr<base::RefCountedData<bool> >& deleted_flag) + : deleted_flag_(deleted_flag) { + } + + private: + friend class base::RefCountedThreadSafe<DeletionHelper>; + virtual ~DeletionHelper() { deleted_flag_->data = true; } + + const scoped_refptr<base::RefCountedData<bool> > deleted_flag_; + DISALLOW_COPY_AND_ASSIGN(DeletionHelper); +}; + +void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool, + const scoped_refptr<DeletionHelper>& helper) { + ADD_FAILURE() << "Should never run"; +} + +// Tests that delayed tasks are deleted upon shutdown of the pool. +TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) { + // Post something to verify the pool is started up. + EXPECT_TRUE(pool()->PostTask( + FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1))); + + scoped_refptr<base::RefCountedData<bool> > deleted_flag( + new base::RefCountedData<bool>(false)); + + base::Time posted_at(base::Time::Now()); + // Post something that shouldn't run. + EXPECT_TRUE(pool()->PostDelayedTask( + FROM_HERE, + base::Bind(&HoldPoolReference, + pool(), + make_scoped_refptr(new DeletionHelper(deleted_flag))), + TestTimeouts::action_timeout())); + + std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1); + ASSERT_EQ(1u, completion_sequence.size()); + ASSERT_EQ(1, completion_sequence[0]); + + pool()->Shutdown(); + // Shutdown is asynchronous, so use ResetPool() to block until the pool is + // fully destroyed (and thus shut down). + ResetPool(); + + // Verify that we didn't block until the task was due. + ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout()); + + // Verify that the deferred task has not only not run, but has also been + // destroyed. + ASSERT_TRUE(deleted_flag->data); +} + +// Tests that same-named tokens have the same ID. +TEST_F(SequencedWorkerPoolTest, NamedTokens) { + const std::string name1("hello"); + SequencedWorkerPool::SequenceToken token1 = + pool()->GetNamedSequenceToken(name1); + + SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); + + const std::string name3("goodbye"); + SequencedWorkerPool::SequenceToken token3 = + pool()->GetNamedSequenceToken(name3); + + // All 3 tokens should be different. + EXPECT_FALSE(token1.Equals(token2)); + EXPECT_FALSE(token1.Equals(token3)); + EXPECT_FALSE(token2.Equals(token3)); + + // Requesting the same name again should give the same value. + SequencedWorkerPool::SequenceToken token1again = + pool()->GetNamedSequenceToken(name1); + EXPECT_TRUE(token1.Equals(token1again)); + + SequencedWorkerPool::SequenceToken token3again = + pool()->GetNamedSequenceToken(name3); + EXPECT_TRUE(token3.Equals(token3again)); +} + +// Tests that posting a bunch of tasks (many more than the number of worker +// threads) runs them all. +TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::SlowTask, tracker(), 0)); + + const size_t kNumTasks = 20; + for (size_t i = 1; i < kNumTasks; i++) { + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), i)); + } + + std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); + EXPECT_EQ(kNumTasks, result.size()); +} + +// Tests that posting a bunch of tasks (many more than the number of +// worker threads) to two pools simultaneously runs them all twice. +// This test is meant to shake out any concurrency issues between +// pools (like histograms). +TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) { + SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1"); + SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2"); + + base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0); + pool1.pool()->PostWorkerTask(FROM_HERE, slow_task); + pool2.pool()->PostWorkerTask(FROM_HERE, slow_task); + + const size_t kNumTasks = 20; + for (size_t i = 1; i < kNumTasks; i++) { + base::Closure fast_task = + base::Bind(&TestTracker::FastTask, tracker(), i); + pool1.pool()->PostWorkerTask(FROM_HERE, fast_task); + pool2.pool()->PostWorkerTask(FROM_HERE, fast_task); + } + + std::vector<int> result = + tracker()->WaitUntilTasksComplete(2*kNumTasks); + EXPECT_EQ(2 * kNumTasks, result.size()); + + pool2.pool()->Shutdown(); + pool1.pool()->Shutdown(); +} + +// Test that tasks with the same sequence token are executed in order but don't +// affect other tasks. +TEST_F(SequencedWorkerPoolTest, Sequence) { + // Fill all the worker threads except one. + const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; + ThreadBlocker background_blocker; + for (size_t i = 0; i < kNumBackgroundTasks; i++) { + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), i, &background_blocker)); + } + tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); + + // Create two tasks with the same sequence token, one that will block on the + // event, and one which will just complete quickly when it's run. Since there + // is one worker thread free, the first task will start and then block, and + // the second task should be waiting. + ThreadBlocker blocker; + SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); + pool()->PostSequencedWorkerTask( + token1, FROM_HERE, + base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); + pool()->PostSequencedWorkerTask( + token1, FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 101)); + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); + + // Create another two tasks as above with a different token. These will be + // blocked since there are no slots to run. + SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); + pool()->PostSequencedWorkerTask( + token2, FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 200)); + pool()->PostSequencedWorkerTask( + token2, FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 201)); + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); + + // Let one background task complete. This should then let both tasks of + // token2 run to completion in order. The second task of token1 should still + // be blocked. + background_blocker.Unblock(1); + std::vector<int> result = tracker()->WaitUntilTasksComplete(3); + ASSERT_EQ(3u, result.size()); + EXPECT_EQ(200, result[1]); + EXPECT_EQ(201, result[2]); + + // Finish the rest of the background tasks. This should leave some workers + // free with the second token1 task still blocked on the first. + background_blocker.Unblock(kNumBackgroundTasks - 1); + EXPECT_EQ(kNumBackgroundTasks + 2, + tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); + + // Allow the first task of token1 to complete. This should run the second. + blocker.Unblock(1); + result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); + ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); + EXPECT_EQ(100, result[result.size() - 2]); + EXPECT_EQ(101, result[result.size() - 1]); +} + +// Tests that any tasks posted after Shutdown are ignored. +// Disabled for flakiness. See http://crbug.com/166451. +TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) { + // Start tasks to take all the threads and block them. + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + for (size_t i = 0; i < kNumWorkerThreads; i++) { + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), i, &blocker)); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + SetWillWaitForShutdownCallback( + base::Bind(&EnsureTasksToCompleteCountAndUnblock, + scoped_refptr<TestTracker>(tracker()), 0, + &blocker, kNumWorkerThreads)); + + // Shutdown the worker pool. This should discard all non-blocking tasks. + const int kMaxNewBlockingTasksAfterShutdown = 100; + pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown); + + int old_has_work_call_count = has_work_call_count(); + + std::vector<int> result = + tracker()->WaitUntilTasksComplete(kNumWorkerThreads); + + // The kNumWorkerThread items should have completed, in no particular order. + ASSERT_EQ(kNumWorkerThreads, result.size()); + for (size_t i = 0; i < kNumWorkerThreads; i++) { + EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != + result.end()); + } + + // No further tasks, regardless of shutdown mode, should be allowed. + EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 100), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); + EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 101), + SequencedWorkerPool::SKIP_ON_SHUTDOWN)); + EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 102), + SequencedWorkerPool::BLOCK_SHUTDOWN)); + + ASSERT_EQ(old_has_work_call_count, has_work_call_count()); +} + +TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { + // Test that <n> new blocking tasks are allowed provided they're posted + // by a running tasks. + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + + // Start tasks to take all the threads and block them. + const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads); + for (int i = 0; i < kNumBlockTasks; ++i) { + EXPECT_TRUE(pool()->PostWorkerTask( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker))); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Queue up shutdown blocking tasks behind those which will attempt to post + // additional tasks when run, PostAdditionalTasks attemtps to post 3 + // new FastTasks, one for each shutdown_behavior. + const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); + for (int i = 0; i < kNumQueuedTasks; ++i) { + EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(), + false), + SequencedWorkerPool::BLOCK_SHUTDOWN)); + } + + // Setup to open the floodgates from within Shutdown(). + SetWillWaitForShutdownCallback( + base::Bind(&EnsureTasksToCompleteCountAndUnblock, + scoped_refptr<TestTracker>(tracker()), + 0, &blocker, kNumBlockTasks)); + + // Allow half of the additional blocking tasks thru. + const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2; + pool()->Shutdown(kNumNewBlockingTasksToAllow); + + // Ensure that the correct number of tasks actually got run. + tracker()->WaitUntilTasksComplete(static_cast<size_t>( + kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow)); + + // Clean up the task IDs we added and go home. + tracker()->ClearCompleteSequence(); +} + +// Tests that unrun tasks are discarded properly according to their shutdown +// mode. +TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { + // Start tasks to take all the threads and block them. + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + for (size_t i = 0; i < kNumWorkerThreads; i++) { + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), i, &blocker)); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Create some tasks with different shutdown modes. + pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 100), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); + pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 101), + SequencedWorkerPool::SKIP_ON_SHUTDOWN); + pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 102), + SequencedWorkerPool::BLOCK_SHUTDOWN); + + // Shutdown the worker pool. This should discard all non-blocking tasks. + SetWillWaitForShutdownCallback( + base::Bind(&EnsureTasksToCompleteCountAndUnblock, + scoped_refptr<TestTracker>(tracker()), 0, + &blocker, kNumWorkerThreads)); + pool()->Shutdown(); + + std::vector<int> result = + tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1); + + // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN + // one, in no particular order. + ASSERT_EQ(kNumWorkerThreads + 1, result.size()); + for (size_t i = 0; i < kNumWorkerThreads; i++) { + EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != + result.end()); + } + EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); +} + +// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. +TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { + scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior( + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); + scoped_refptr<SequencedTaskRunner> sequenced_runner( + pool()->GetSequencedTaskRunnerWithShutdownBehavior( + pool()->GetSequenceToken(), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), 0, &blocker), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); + runner->PostTask( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), 1, &blocker)); + sequenced_runner->PostTask( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), 2, &blocker)); + + tracker()->WaitUntilTasksBlocked(3); + + // This should not block. If this test hangs, it means it failed. + pool()->Shutdown(); + + // The task should not have completed yet. + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); + + // Posting more tasks should fail. + EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); + EXPECT_FALSE(runner->PostTask( + FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); + EXPECT_FALSE(sequenced_runner->PostTask( + FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); + + // Continue the background thread and make sure the tasks can complete. + blocker.Unblock(3); + std::vector<int> result = tracker()->WaitUntilTasksComplete(3); + EXPECT_EQ(3u, result.size()); +} + +// Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown +// until they stop, but tasks not yet started do not. +TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) { + // Start tasks to take all the threads and block them. + EnsureAllWorkersCreated(); + ThreadBlocker blocker; + + // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not + // return until these tasks have completed. + for (size_t i = 0; i < kNumWorkerThreads; i++) { + pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker), + SequencedWorkerPool::SKIP_ON_SHUTDOWN); + } + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); + + // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be + // executed once Shutdown() has been called. + pool()->PostWorkerTaskWithShutdownBehavior( + FROM_HERE, + base::Bind(&TestTracker::BlockTask, + tracker(), 0, &blocker), + SequencedWorkerPool::SKIP_ON_SHUTDOWN); + + // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have + // been started block shutdown. + SetWillWaitForShutdownCallback( + base::Bind(&EnsureTasksToCompleteCountAndUnblock, + scoped_refptr<TestTracker>(tracker()), 0, + &blocker, kNumWorkerThreads)); + + // No tasks should have completed yet. + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); + + // This should not block. If this test hangs, it means it failed. + pool()->Shutdown(); + + // Shutdown should not return until all of the tasks have completed. + std::vector<int> result = + tracker()->WaitUntilTasksComplete(kNumWorkerThreads); + + // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be + // allowed to complete. No additional non-blocking tasks should have been + // started. + ASSERT_EQ(kNumWorkerThreads, result.size()); + for (size_t i = 0; i < kNumWorkerThreads; i++) { + EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != + result.end()); + } +} + +// Ensure all worker threads are created, and then trigger a spurious +// work signal. This shouldn't cause any other work signals to be +// triggered. This is a regression test for http://crbug.com/117469. +TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) { + EnsureAllWorkersCreated(); + int old_has_work_call_count = has_work_call_count(); + pool()->SignalHasWorkForTesting(); + // This is inherently racy, but can only produce false positives. + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); + EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count()); +} + +void IsRunningOnCurrentThreadTask( + SequencedWorkerPool::SequenceToken test_positive_token, + SequencedWorkerPool::SequenceToken test_negative_token, + SequencedWorkerPool* pool, + SequencedWorkerPool* unused_pool) { + EXPECT_TRUE(pool->RunsTasksOnCurrentThread()); + EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token)); + EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token)); + EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); + EXPECT_FALSE( + unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token)); + EXPECT_FALSE( + unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token)); +} + +// Verify correctness of the IsRunningSequenceOnCurrentThread method. +TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) { + SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); + SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); + SequencedWorkerPool::SequenceToken unsequenced_token; + + scoped_refptr<SequencedWorkerPool> unused_pool = + new SequencedWorkerPool(2, "unused_pool"); + + EXPECT_FALSE(pool()->RunsTasksOnCurrentThread()); + EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1)); + EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2)); + EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token)); + EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); + EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1)); + EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2)); + EXPECT_FALSE( + unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token)); + + pool()->PostSequencedWorkerTask( + token1, FROM_HERE, + base::Bind(&IsRunningOnCurrentThreadTask, + token1, token2, pool(), unused_pool)); + pool()->PostSequencedWorkerTask( + token2, FROM_HERE, + base::Bind(&IsRunningOnCurrentThreadTask, + token2, unsequenced_token, pool(), unused_pool)); + pool()->PostWorkerTask( + FROM_HERE, + base::Bind(&IsRunningOnCurrentThreadTask, + unsequenced_token, token1, pool(), unused_pool)); + pool()->Shutdown(); + unused_pool->Shutdown(); +} + +// Verify that FlushForTesting works as intended. +TEST_F(SequencedWorkerPoolTest, FlushForTesting) { + // Should be fine to call on a new instance. + pool()->FlushForTesting(); + + // Queue up a bunch of work, including a long delayed task and + // a task that produces additional tasks as an artifact. + pool()->PostDelayedWorkerTask( + FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 0), + TimeDelta::FromMinutes(5)); + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::SlowTask, tracker(), 0)); + const size_t kNumFastTasks = 20; + for (size_t i = 0; i < kNumFastTasks; i++) { + pool()->PostWorkerTask(FROM_HERE, + base::Bind(&TestTracker::FastTask, tracker(), 0)); + } + pool()->PostWorkerTask( + FROM_HERE, + base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(), + true)); + + // We expect all except the delayed task to have been run. We verify all + // closures have been deleted by looking at the refcount of the + // tracker. + EXPECT_FALSE(tracker()->HasOneRef()); + pool()->FlushForTesting(); + EXPECT_TRUE(tracker()->HasOneRef()); + EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount()); + + // Should be fine to call on an idle instance with all threads created, and + // spamming the method shouldn't deadlock or confuse the class. + pool()->FlushForTesting(); + pool()->FlushForTesting(); + + // Should be fine to call after shutdown too. + pool()->Shutdown(); + pool()->FlushForTesting(); +} + +TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) { + MessageLoop loop; + scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool")); + scoped_refptr<SequencedTaskRunner> task_runner = + pool->GetSequencedTaskRunnerWithShutdownBehavior( + pool->GetSequenceToken(), + base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); + + // Upon test exit, should shut down without hanging. + pool->Shutdown(); +} + +class SequencedWorkerPoolTaskRunnerTestDelegate { + public: + SequencedWorkerPoolTaskRunnerTestDelegate() {} + + ~SequencedWorkerPoolTaskRunnerTestDelegate() {} + + void StartTaskRunner() { + pool_owner_.reset( + new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); + } + + scoped_refptr<SequencedWorkerPool> GetTaskRunner() { + return pool_owner_->pool(); + } + + void StopTaskRunner() { + // Make sure all tasks are run before shutting down. Delayed tasks are + // not run, they're simply deleted. + pool_owner_->pool()->FlushForTesting(); + pool_owner_->pool()->Shutdown(); + // Don't reset |pool_owner_| here, as the test may still hold a + // reference to the pool. + } + + bool TaskRunnerHandlesNonZeroDelays() const { + return true; + } + + private: + MessageLoop message_loop_; + scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; +}; + +INSTANTIATE_TYPED_TEST_CASE_P( + SequencedWorkerPool, TaskRunnerTest, + SequencedWorkerPoolTaskRunnerTestDelegate); + +class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate { + public: + SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {} + + ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() { + } + + void StartTaskRunner() { + pool_owner_.reset( + new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); + task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( + SequencedWorkerPool::BLOCK_SHUTDOWN); + } + + scoped_refptr<TaskRunner> GetTaskRunner() { + return task_runner_; + } + + void StopTaskRunner() { + // Make sure all tasks are run before shutting down. Delayed tasks are + // not run, they're simply deleted. + pool_owner_->pool()->FlushForTesting(); + pool_owner_->pool()->Shutdown(); + // Don't reset |pool_owner_| here, as the test may still hold a + // reference to the pool. + } + + bool TaskRunnerHandlesNonZeroDelays() const { + return true; + } + + private: + MessageLoop message_loop_; + scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; + scoped_refptr<TaskRunner> task_runner_; +}; + +INSTANTIATE_TYPED_TEST_CASE_P( + SequencedWorkerPoolTaskRunner, TaskRunnerTest, + SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate); + +class SequencedWorkerPoolSequencedTaskRunnerTestDelegate { + public: + SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {} + + ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() { + } + + void StartTaskRunner() { + pool_owner_.reset(new SequencedWorkerPoolOwner( + 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); + task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( + pool_owner_->pool()->GetSequenceToken()); + } + + scoped_refptr<SequencedTaskRunner> GetTaskRunner() { + return task_runner_; + } + + void StopTaskRunner() { + // Make sure all tasks are run before shutting down. Delayed tasks are + // not run, they're simply deleted. + pool_owner_->pool()->FlushForTesting(); + pool_owner_->pool()->Shutdown(); + // Don't reset |pool_owner_| here, as the test may still hold a + // reference to the pool. + } + + bool TaskRunnerHandlesNonZeroDelays() const { + return true; + } + + private: + MessageLoop message_loop_; + scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; + scoped_refptr<SequencedTaskRunner> task_runner_; +}; + +INSTANTIATE_TYPED_TEST_CASE_P( + SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, + SequencedWorkerPoolSequencedTaskRunnerTestDelegate); + +INSTANTIATE_TYPED_TEST_CASE_P( + SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, + SequencedWorkerPoolSequencedTaskRunnerTestDelegate); + +} // namespace + +} // namespace base diff --git a/chromium/base/threading/simple_thread.cc b/chromium/base/threading/simple_thread.cc new file mode 100644 index 00000000000..028d4f4ab23 --- /dev/null +++ b/chromium/base/threading/simple_thread.cc @@ -0,0 +1,159 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/simple_thread.h" + +#include "base/logging.h" +#include "base/strings/string_number_conversions.h" +#include "base/threading/platform_thread.h" +#include "base/threading/thread_restrictions.h" + +namespace base { + +SimpleThread::SimpleThread(const std::string& name_prefix) + : name_prefix_(name_prefix), name_(name_prefix), + thread_(), event_(true, false), tid_(0), joined_(false) { +} + +SimpleThread::SimpleThread(const std::string& name_prefix, + const Options& options) + : name_prefix_(name_prefix), name_(name_prefix), options_(options), + thread_(), event_(true, false), tid_(0), joined_(false) { +} + +SimpleThread::~SimpleThread() { + DCHECK(HasBeenStarted()) << "SimpleThread was never started."; + DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed."; +} + +void SimpleThread::Start() { + DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times."; + bool success = PlatformThread::Create(options_.stack_size(), this, &thread_); + DCHECK(success); + base::ThreadRestrictions::ScopedAllowWait allow_wait; + event_.Wait(); // Wait for the thread to complete initialization. +} + +void SimpleThread::Join() { + DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread."; + DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times."; + PlatformThread::Join(thread_); + joined_ = true; +} + +bool SimpleThread::HasBeenStarted() { + base::ThreadRestrictions::ScopedAllowWait allow_wait; + return event_.IsSignaled(); +} + +void SimpleThread::ThreadMain() { + tid_ = PlatformThread::CurrentId(); + // Construct our full name of the form "name_prefix_/TID". + name_.push_back('/'); + name_.append(IntToString(tid_)); + PlatformThread::SetName(name_.c_str()); + + // We've initialized our new thread, signal that we're done to Start(). + event_.Signal(); + + Run(); +} + +DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix) + : SimpleThread(name_prefix), + delegate_(delegate) { +} + +DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix, + const Options& options) + : SimpleThread(name_prefix, options), + delegate_(delegate) { +} + +DelegateSimpleThread::~DelegateSimpleThread() { +} + +void DelegateSimpleThread::Run() { + DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)"; + delegate_->Run(); + delegate_ = NULL; +} + +DelegateSimpleThreadPool::DelegateSimpleThreadPool( + const std::string& name_prefix, + int num_threads) + : name_prefix_(name_prefix), + num_threads_(num_threads), + dry_(true, false) { +} + +DelegateSimpleThreadPool::~DelegateSimpleThreadPool() { + DCHECK(threads_.empty()); + DCHECK(delegates_.empty()); + DCHECK(!dry_.IsSignaled()); +} + +void DelegateSimpleThreadPool::Start() { + DCHECK(threads_.empty()) << "Start() called with outstanding threads."; + for (int i = 0; i < num_threads_; ++i) { + DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_); + thread->Start(); + threads_.push_back(thread); + } +} + +void DelegateSimpleThreadPool::JoinAll() { + DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads."; + + // Tell all our threads to quit their worker loop. + AddWork(NULL, num_threads_); + + // Join and destroy all the worker threads. + for (int i = 0; i < num_threads_; ++i) { + threads_[i]->Join(); + delete threads_[i]; + } + threads_.clear(); + DCHECK(delegates_.empty()); +} + +void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) { + AutoLock locked(lock_); + for (int i = 0; i < repeat_count; ++i) + delegates_.push(delegate); + // If we were empty, signal that we have work now. + if (!dry_.IsSignaled()) + dry_.Signal(); +} + +void DelegateSimpleThreadPool::Run() { + Delegate* work = NULL; + + while (true) { + dry_.Wait(); + { + AutoLock locked(lock_); + if (!dry_.IsSignaled()) + continue; + + DCHECK(!delegates_.empty()); + work = delegates_.front(); + delegates_.pop(); + + // Signal to any other threads that we're currently out of work. + if (delegates_.empty()) + dry_.Reset(); + } + + // A NULL delegate pointer signals us to quit. + if (!work) + break; + + work->Run(); + } +} + +} // namespace base diff --git a/chromium/base/threading/simple_thread.h b/chromium/base/threading/simple_thread.h new file mode 100644 index 00000000000..df03ce1c813 --- /dev/null +++ b/chromium/base/threading/simple_thread.h @@ -0,0 +1,190 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// WARNING: You should probably be using Thread (thread.h) instead. Thread is +// Chrome's message-loop based Thread abstraction, and if you are a +// thread running in the browser, there will likely be assumptions +// that your thread will have an associated message loop. +// +// This is a simple thread interface that backs to a native operating system +// thread. You should use this only when you want a thread that does not have +// an associated MessageLoop. Unittesting is the best example of this. +// +// The simplest interface to use is DelegateSimpleThread, which will create +// a new thread, and execute the Delegate's virtual Run() in this new thread +// until it has completed, exiting the thread. +// +// NOTE: You *MUST* call Join on the thread to clean up the underlying thread +// resources. You are also responsible for destructing the SimpleThread object. +// It is invalid to destroy a SimpleThread while it is running, or without +// Start() having been called (and a thread never created). The Delegate +// object should live as long as a DelegateSimpleThread. +// +// Thread Safety: A SimpleThread is not completely thread safe. It is safe to +// access it from the creating thread or from the newly created thread. This +// implies that the creator thread should be the thread that calls Join. +// +// Example: +// class MyThreadRunner : public DelegateSimpleThread::Delegate { ... }; +// MyThreadRunner runner; +// DelegateSimpleThread thread(&runner, "good_name_here"); +// thread.Start(); +// // Start will return after the Thread has been successfully started and +// // initialized. The newly created thread will invoke runner->Run(), and +// // run until it returns. +// thread.Join(); // Wait until the thread has exited. You *MUST* Join! +// // The SimpleThread object is still valid, however you may not call Join +// // or Start again. + +#ifndef BASE_THREADING_SIMPLE_THREAD_H_ +#define BASE_THREADING_SIMPLE_THREAD_H_ + +#include <string> +#include <queue> +#include <vector> + +#include "base/base_export.h" +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "base/threading/platform_thread.h" +#include "base/synchronization/lock.h" +#include "base/synchronization/waitable_event.h" + +namespace base { + +// This is the base SimpleThread. You can derive from it and implement the +// virtual Run method, or you can use the DelegateSimpleThread interface. +class BASE_EXPORT SimpleThread : public PlatformThread::Delegate { + public: + class BASE_EXPORT Options { + public: + Options() : stack_size_(0) { } + ~Options() { } + + // We use the standard compiler-supplied copy constructor. + + // A custom stack size, or 0 for the system default. + void set_stack_size(size_t size) { stack_size_ = size; } + size_t stack_size() const { return stack_size_; } + private: + size_t stack_size_; + }; + + // Create a SimpleThread. |options| should be used to manage any specific + // configuration involving the thread creation and management. + // Every thread has a name, in the form of |name_prefix|/TID, for example + // "my_thread/321". The thread will not be created until Start() is called. + explicit SimpleThread(const std::string& name_prefix); + SimpleThread(const std::string& name_prefix, const Options& options); + + virtual ~SimpleThread(); + + virtual void Start(); + virtual void Join(); + + // Subclasses should override the Run method. + virtual void Run() = 0; + + // Return the thread name prefix, or "unnamed" if none was supplied. + std::string name_prefix() { return name_prefix_; } + + // Return the completed name including TID, only valid after Start(). + std::string name() { return name_; } + + // Return the thread id, only valid after Start(). + PlatformThreadId tid() { return tid_; } + + // Return True if Start() has ever been called. + bool HasBeenStarted(); + + // Return True if Join() has evern been called. + bool HasBeenJoined() { return joined_; } + + // Overridden from PlatformThread::Delegate: + virtual void ThreadMain() OVERRIDE; + + // Only set priorities with a careful understanding of the consequences. + // This is meant for very limited use cases. + void SetThreadPriority(ThreadPriority priority) { + PlatformThread::SetThreadPriority(thread_, priority); + } + + private: + const std::string name_prefix_; + std::string name_; + const Options options_; + PlatformThreadHandle thread_; // PlatformThread handle, invalid after Join! + WaitableEvent event_; // Signaled if Start() was ever called. + PlatformThreadId tid_; // The backing thread's id. + bool joined_; // True if Join has been called. +}; + +class BASE_EXPORT DelegateSimpleThread : public SimpleThread { + public: + class BASE_EXPORT Delegate { + public: + Delegate() { } + virtual ~Delegate() { } + virtual void Run() = 0; + }; + + DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix); + DelegateSimpleThread(Delegate* delegate, + const std::string& name_prefix, + const Options& options); + + virtual ~DelegateSimpleThread(); + virtual void Run() OVERRIDE; + private: + Delegate* delegate_; +}; + +// DelegateSimpleThreadPool allows you to start up a fixed number of threads, +// and then add jobs which will be dispatched to the threads. This is +// convenient when you have a lot of small work that you want done +// multi-threaded, but don't want to spawn a thread for each small bit of work. +// +// You just call AddWork() to add a delegate to the list of work to be done. +// JoinAll() will make sure that all outstanding work is processed, and wait +// for everything to finish. You can reuse a pool, so you can call Start() +// again after you've called JoinAll(). +class BASE_EXPORT DelegateSimpleThreadPool + : public DelegateSimpleThread::Delegate { + public: + typedef DelegateSimpleThread::Delegate Delegate; + + DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads); + virtual ~DelegateSimpleThreadPool(); + + // Start up all of the underlying threads, and start processing work if we + // have any. + void Start(); + + // Make sure all outstanding work is finished, and wait for and destroy all + // of the underlying threads in the pool. + void JoinAll(); + + // It is safe to AddWork() any time, before or after Start(). + // Delegate* should always be a valid pointer, NULL is reserved internally. + void AddWork(Delegate* work, int repeat_count); + void AddWork(Delegate* work) { + AddWork(work, 1); + } + + // We implement the Delegate interface, for running our internal threads. + virtual void Run() OVERRIDE; + + private: + const std::string name_prefix_; + int num_threads_; + std::vector<DelegateSimpleThread*> threads_; + std::queue<Delegate*> delegates_; + base::Lock lock_; // Locks delegates_ + WaitableEvent dry_; // Not signaled when there is no work to do. +}; + +} // namespace base + +#endif // BASE_THREADING_SIMPLE_THREAD_H_ diff --git a/chromium/base/threading/simple_thread_unittest.cc b/chromium/base/threading/simple_thread_unittest.cc new file mode 100644 index 00000000000..a744b8d4588 --- /dev/null +++ b/chromium/base/threading/simple_thread_unittest.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/atomic_sequence_num.h" +#include "base/strings/string_number_conversions.h" +#include "base/synchronization/waitable_event.h" +#include "base/threading/simple_thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +class SetIntRunner : public DelegateSimpleThread::Delegate { + public: + SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) { } + virtual ~SetIntRunner() { } + + virtual void Run() OVERRIDE { + *ptr_ = val_; + } + + private: + int* ptr_; + int val_; +}; + +class WaitEventRunner : public DelegateSimpleThread::Delegate { + public: + explicit WaitEventRunner(WaitableEvent* event) : event_(event) { } + virtual ~WaitEventRunner() { } + + virtual void Run() OVERRIDE { + EXPECT_FALSE(event_->IsSignaled()); + event_->Signal(); + EXPECT_TRUE(event_->IsSignaled()); + } + private: + WaitableEvent* event_; +}; + +class SeqRunner : public DelegateSimpleThread::Delegate { + public: + explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) { } + virtual void Run() OVERRIDE { + seq_->GetNext(); + } + + private: + AtomicSequenceNumber* seq_; +}; + +// We count up on a sequence number, firing on the event when we've hit our +// expected amount, otherwise we wait on the event. This will ensure that we +// have all threads outstanding until we hit our expected thread pool size. +class VerifyPoolRunner : public DelegateSimpleThread::Delegate { + public: + VerifyPoolRunner(AtomicSequenceNumber* seq, + int total, WaitableEvent* event) + : seq_(seq), total_(total), event_(event) { } + + virtual void Run() OVERRIDE { + if (seq_->GetNext() == total_) { + event_->Signal(); + } else { + event_->Wait(); + } + } + + private: + AtomicSequenceNumber* seq_; + int total_; + WaitableEvent* event_; +}; + +} // namespace + +TEST(SimpleThreadTest, CreateAndJoin) { + int stack_int = 0; + + SetIntRunner runner(&stack_int, 7); + EXPECT_EQ(0, stack_int); + + DelegateSimpleThread thread(&runner, "int_setter"); + EXPECT_FALSE(thread.HasBeenStarted()); + EXPECT_FALSE(thread.HasBeenJoined()); + EXPECT_EQ(0, stack_int); + + thread.Start(); + EXPECT_TRUE(thread.HasBeenStarted()); + EXPECT_FALSE(thread.HasBeenJoined()); + + thread.Join(); + EXPECT_TRUE(thread.HasBeenStarted()); + EXPECT_TRUE(thread.HasBeenJoined()); + EXPECT_EQ(7, stack_int); +} + +TEST(SimpleThreadTest, WaitForEvent) { + // Create a thread, and wait for it to signal us. + WaitableEvent event(true, false); + + WaitEventRunner runner(&event); + DelegateSimpleThread thread(&runner, "event_waiter"); + + EXPECT_FALSE(event.IsSignaled()); + thread.Start(); + event.Wait(); + EXPECT_TRUE(event.IsSignaled()); + thread.Join(); +} + +TEST(SimpleThreadTest, NamedWithOptions) { + WaitableEvent event(true, false); + + WaitEventRunner runner(&event); + SimpleThread::Options options; + DelegateSimpleThread thread(&runner, "event_waiter", options); + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_FALSE(event.IsSignaled()); + + thread.Start(); + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_EQ(thread.name(), + std::string("event_waiter/") + IntToString(thread.tid())); + event.Wait(); + + EXPECT_TRUE(event.IsSignaled()); + thread.Join(); + + // We keep the name and tid, even after the thread is gone. + EXPECT_EQ(thread.name_prefix(), "event_waiter"); + EXPECT_EQ(thread.name(), + std::string("event_waiter/") + IntToString(thread.tid())); +} + +TEST(SimpleThreadTest, ThreadPool) { + AtomicSequenceNumber seq; + SeqRunner runner(&seq); + DelegateSimpleThreadPool pool("seq_runner", 10); + + // Add work before we're running. + pool.AddWork(&runner, 300); + + EXPECT_EQ(seq.GetNext(), 0); + pool.Start(); + + // Add work while we're running. + pool.AddWork(&runner, 300); + + pool.JoinAll(); + + EXPECT_EQ(seq.GetNext(), 601); + + // We can reuse our pool. Verify that all 10 threads can actually run in + // parallel, so this test will only pass if there are actually 10 threads. + AtomicSequenceNumber seq2; + WaitableEvent event(true, false); + // Changing 9 to 10, for example, would cause us JoinAll() to never return. + VerifyPoolRunner verifier(&seq2, 9, &event); + pool.Start(); + + pool.AddWork(&verifier, 10); + + pool.JoinAll(); + EXPECT_EQ(seq2.GetNext(), 10); +} + +} // namespace base diff --git a/chromium/base/threading/thread.cc b/chromium/base/threading/thread.cc new file mode 100644 index 00000000000..00f303d3609 --- /dev/null +++ b/chromium/base/threading/thread.cc @@ -0,0 +1,221 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread.h" + +#include "base/bind.h" +#include "base/lazy_instance.h" +#include "base/third_party/dynamic_annotations/dynamic_annotations.h" +#include "base/threading/thread_id_name_manager.h" +#include "base/threading/thread_local.h" +#include "base/threading/thread_restrictions.h" +#include "base/synchronization/waitable_event.h" + +#if defined(OS_WIN) +#include "base/win/scoped_com_initializer.h" +#endif + +namespace base { + +namespace { + +// We use this thread-local variable to record whether or not a thread exited +// because its Stop method was called. This allows us to catch cases where +// MessageLoop::QuitWhenIdle() is called directly, which is unexpected when +// using a Thread to setup and run a MessageLoop. +base::LazyInstance<base::ThreadLocalBoolean> lazy_tls_bool = + LAZY_INSTANCE_INITIALIZER; + +} // namespace + +// This is used to trigger the message loop to exit. +void ThreadQuitHelper() { + MessageLoop::current()->QuitWhenIdle(); + Thread::SetThreadWasQuitProperly(true); +} + +// Used to pass data to ThreadMain. This structure is allocated on the stack +// from within StartWithOptions. +struct Thread::StartupData { + // We get away with a const reference here because of how we are allocated. + const Thread::Options& options; + + // Used to synchronize thread startup. + WaitableEvent event; + + explicit StartupData(const Options& opt) + : options(opt), + event(false, false) {} +}; + +Thread::Thread(const char* name) + : +#if defined(OS_WIN) + com_status_(NONE), +#endif + started_(false), + stopping_(false), + running_(false), + startup_data_(NULL), + thread_(0), + message_loop_(NULL), + thread_id_(kInvalidThreadId), + name_(name) { +} + +Thread::~Thread() { + Stop(); +} + +bool Thread::Start() { + Options options; +#if defined(OS_WIN) + if (com_status_ == STA) + options.message_loop_type = MessageLoop::TYPE_UI; +#endif + return StartWithOptions(options); +} + +bool Thread::StartWithOptions(const Options& options) { + DCHECK(!message_loop_); +#if defined(OS_WIN) + DCHECK((com_status_ != STA) || + (options.message_loop_type == MessageLoop::TYPE_UI)); +#endif + + SetThreadWasQuitProperly(false); + + StartupData startup_data(options); + startup_data_ = &startup_data; + + if (!PlatformThread::Create(options.stack_size, this, &thread_)) { + DLOG(ERROR) << "failed to create thread"; + startup_data_ = NULL; + return false; + } + + // Wait for the thread to start and initialize message_loop_ + base::ThreadRestrictions::ScopedAllowWait allow_wait; + startup_data.event.Wait(); + + // set it to NULL so we don't keep a pointer to some object on the stack. + startup_data_ = NULL; + started_ = true; + + DCHECK(message_loop_); + return true; +} + +void Thread::Stop() { + if (!started_) + return; + + StopSoon(); + + // Wait for the thread to exit. + // + // TODO(darin): Unfortunately, we need to keep message_loop_ around until + // the thread exits. Some consumers are abusing the API. Make them stop. + // + PlatformThread::Join(thread_); + + // The thread should NULL message_loop_ on exit. + DCHECK(!message_loop_); + + // The thread no longer needs to be joined. + started_ = false; + + stopping_ = false; +} + +void Thread::StopSoon() { + // We should only be called on the same thread that started us. + + // Reading thread_id_ without a lock can lead to a benign data race + // with ThreadMain, so we annotate it to stay silent under ThreadSanitizer. + DCHECK_NE(ANNOTATE_UNPROTECTED_READ(thread_id_), PlatformThread::CurrentId()); + + if (stopping_ || !message_loop_) + return; + + stopping_ = true; + message_loop_->PostTask(FROM_HERE, base::Bind(&ThreadQuitHelper)); +} + +bool Thread::IsRunning() const { + return running_; +} + +void Thread::SetPriority(ThreadPriority priority) { + // The thread must be started (and id known) for this to be + // compatible with all platforms. + DCHECK_NE(thread_id_, kInvalidThreadId); + PlatformThread::SetThreadPriority(thread_, priority); +} + +void Thread::Run(MessageLoop* message_loop) { + message_loop->Run(); +} + +void Thread::SetThreadWasQuitProperly(bool flag) { + lazy_tls_bool.Pointer()->Set(flag); +} + +bool Thread::GetThreadWasQuitProperly() { + bool quit_properly = true; +#ifndef NDEBUG + quit_properly = lazy_tls_bool.Pointer()->Get(); +#endif + return quit_properly; +} + +void Thread::ThreadMain() { + { + // The message loop for this thread. + MessageLoop message_loop(startup_data_->options.message_loop_type); + + // Complete the initialization of our Thread object. + thread_id_ = PlatformThread::CurrentId(); + PlatformThread::SetName(name_.c_str()); + ANNOTATE_THREAD_NAME(name_.c_str()); // Tell the name to race detector. + message_loop.set_thread_name(name_); + message_loop_ = &message_loop; + +#if defined(OS_WIN) + scoped_ptr<win::ScopedCOMInitializer> com_initializer; + if (com_status_ != NONE) { + com_initializer.reset((com_status_ == STA) ? + new win::ScopedCOMInitializer() : + new win::ScopedCOMInitializer(win::ScopedCOMInitializer::kMTA)); + } +#endif + + // Let the thread do extra initialization. + // Let's do this before signaling we are started. + Init(); + + running_ = true; + startup_data_->event.Signal(); + // startup_data_ can't be touched anymore since the starting thread is now + // unlocked. + + Run(message_loop_); + running_ = false; + + // Let the thread do extra cleanup. + CleanUp(); + +#if defined(OS_WIN) + com_initializer.reset(); +#endif + + // Assert that MessageLoop::Quit was called by ThreadQuitHelper. + DCHECK(GetThreadWasQuitProperly()); + + // We can't receive messages anymore. + message_loop_ = NULL; + } +} + +} // namespace base diff --git a/chromium/base/threading/thread.h b/chromium/base/threading/thread.h new file mode 100644 index 00000000000..98831b82e5c --- /dev/null +++ b/chromium/base/threading/thread.h @@ -0,0 +1,216 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_THREAD_H_ +#define BASE_THREADING_THREAD_H_ + +#include <string> + +#include "base/base_export.h" +#include "base/message_loop/message_loop.h" +#include "base/message_loop/message_loop_proxy.h" +#include "base/threading/platform_thread.h" + +namespace base { + +// A simple thread abstraction that establishes a MessageLoop on a new thread. +// The consumer uses the MessageLoop of the thread to cause code to execute on +// the thread. When this object is destroyed the thread is terminated. All +// pending tasks queued on the thread's message loop will run to completion +// before the thread is terminated. +// +// WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). +// +// After the thread is stopped, the destruction sequence is: +// +// (1) Thread::CleanUp() +// (2) MessageLoop::~MessageLoop +// (3.b) MessageLoop::DestructionObserver::WillDestroyCurrentMessageLoop +class BASE_EXPORT Thread : PlatformThread::Delegate { + public: + struct Options { + Options() : message_loop_type(MessageLoop::TYPE_DEFAULT), stack_size(0) {} + Options(MessageLoop::Type type, size_t size) + : message_loop_type(type), stack_size(size) {} + + // Specifies the type of message loop that will be allocated on the thread. + MessageLoop::Type message_loop_type; + + // Specifies the maximum stack size that the thread is allowed to use. + // This does not necessarily correspond to the thread's initial stack size. + // A value of 0 indicates that the default maximum should be used. + size_t stack_size; + }; + + // Constructor. + // name is a display string to identify the thread. + explicit Thread(const char* name); + + // Destroys the thread, stopping it if necessary. + // + // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or + // guarantee Stop() is explicitly called before the subclass is destroyed). + // This is required to avoid a data race between the destructor modifying the + // vtable, and the thread's ThreadMain calling the virtual method Run(). It + // also ensures that the CleanUp() virtual method is called on the subclass + // before it is destructed. + virtual ~Thread(); + +#if defined(OS_WIN) + // Causes the thread to initialize COM. This must be called before calling + // Start() or StartWithOptions(). If |use_mta| is false, the thread is also + // started with a TYPE_UI message loop. It is an error to call + // init_com_with_mta(false) and then StartWithOptions() with any message loop + // type other than TYPE_UI. + void init_com_with_mta(bool use_mta) { + DCHECK(!started_); + com_status_ = use_mta ? MTA : STA; + } +#endif + + // Starts the thread. Returns true if the thread was successfully started; + // otherwise, returns false. Upon successful return, the message_loop() + // getter will return non-null. + // + // Note: This function can't be called on Windows with the loader lock held; + // i.e. during a DllMain, global object construction or destruction, atexit() + // callback. + bool Start(); + + // Starts the thread. Behaves exactly like Start in addition to allow to + // override the default options. + // + // Note: This function can't be called on Windows with the loader lock held; + // i.e. during a DllMain, global object construction or destruction, atexit() + // callback. + bool StartWithOptions(const Options& options); + + // Signals the thread to exit and returns once the thread has exited. After + // this method returns, the Thread object is completely reset and may be used + // as if it were newly constructed (i.e., Start may be called again). + // + // Stop may be called multiple times and is simply ignored if the thread is + // already stopped. + // + // NOTE: If you are a consumer of Thread, it is not necessary to call this + // before deleting your Thread objects, as the destructor will do it. + // IF YOU ARE A SUBCLASS OF Thread, YOU MUST CALL THIS IN YOUR DESTRUCTOR. + void Stop(); + + // Signals the thread to exit in the near future. + // + // WARNING: This function is not meant to be commonly used. Use at your own + // risk. Calling this function will cause message_loop() to become invalid in + // the near future. This function was created to workaround a specific + // deadlock on Windows with printer worker thread. In any other case, Stop() + // should be used. + // + // StopSoon should not be called multiple times as it is risky to do so. It + // could cause a timing issue in message_loop() access. Call Stop() to reset + // the thread object once it is known that the thread has quit. + void StopSoon(); + + // Returns the message loop for this thread. Use the MessageLoop's + // PostTask methods to execute code on the thread. This only returns + // non-null after a successful call to Start. After Stop has been called, + // this will return NULL. + // + // NOTE: You must not call this MessageLoop's Quit method directly. Use + // the Thread's Stop method instead. + // + MessageLoop* message_loop() const { return message_loop_; } + + // Returns a MessageLoopProxy for this thread. Use the MessageLoopProxy's + // PostTask methods to execute code on the thread. This only returns + // non-NULL after a successful call to Start. After Stop has been called, + // this will return NULL. Callers can hold on to this even after the thread + // is gone. + scoped_refptr<MessageLoopProxy> message_loop_proxy() const { + return message_loop_ ? message_loop_->message_loop_proxy() : NULL; + } + + // Returns the name of this thread (for display in debugger too). + const std::string& thread_name() const { return name_; } + + // The native thread handle. + PlatformThreadHandle thread_handle() { return thread_; } + + // The thread ID. + PlatformThreadId thread_id() const { return thread_id_; } + + // Returns true if the thread has been started, and not yet stopped. + bool IsRunning() const; + + // Sets the thread priority. The thread must already be started. + void SetPriority(ThreadPriority priority); + + protected: + // Called just prior to starting the message loop + virtual void Init() {} + + // Called to start the message loop + virtual void Run(MessageLoop* message_loop); + + // Called just after the message loop ends + virtual void CleanUp() {} + + static void SetThreadWasQuitProperly(bool flag); + static bool GetThreadWasQuitProperly(); + + void set_message_loop(MessageLoop* message_loop) { + message_loop_ = message_loop; + } + + private: +#if defined(OS_WIN) + enum ComStatus { + NONE, + STA, + MTA, + }; +#endif + + // PlatformThread::Delegate methods: + virtual void ThreadMain() OVERRIDE; + +#if defined(OS_WIN) + // Whether this thread needs to initialize COM, and if so, in what mode. + ComStatus com_status_; +#endif + + // Whether we successfully started the thread. + bool started_; + + // If true, we're in the middle of stopping, and shouldn't access + // |message_loop_|. It may non-NULL and invalid. + bool stopping_; + + // True while inside of Run(). + bool running_; + + // Used to pass data to ThreadMain. + struct StartupData; + StartupData* startup_data_; + + // The thread's handle. + PlatformThreadHandle thread_; + + // The thread's message loop. Valid only while the thread is alive. Set + // by the created thread. + MessageLoop* message_loop_; + + // Our thread's ID. + PlatformThreadId thread_id_; + + // The name of the thread. Used for debugging purposes. + std::string name_; + + friend void ThreadQuitHelper(); + + DISALLOW_COPY_AND_ASSIGN(Thread); +}; + +} // namespace base + +#endif // BASE_THREADING_THREAD_H_ diff --git a/chromium/base/threading/thread_checker.h b/chromium/base/threading/thread_checker.h new file mode 100644 index 00000000000..5a8ef2261d7 --- /dev/null +++ b/chromium/base/threading/thread_checker.h @@ -0,0 +1,85 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_THREAD_CHECKER_H_ +#define BASE_THREADING_THREAD_CHECKER_H_ + +// Apart from debug builds, we also enable the thread checker in +// builds with DCHECK_ALWAYS_ON so that trybots and waterfall bots +// with this define will get the same level of thread checking as +// debug bots. +// +// Note that this does not perfectly match situations where DCHECK is +// enabled. For example a non-official release build may have +// DCHECK_ALWAYS_ON undefined (and therefore ThreadChecker would be +// disabled) but have DCHECKs enabled at runtime. +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) +#define ENABLE_THREAD_CHECKER 1 +#else +#define ENABLE_THREAD_CHECKER 0 +#endif + +#if ENABLE_THREAD_CHECKER +#include "base/threading/thread_checker_impl.h" +#endif + +namespace base { + +// Do nothing implementation, for use in release mode. +// +// Note: You should almost always use the ThreadChecker class to get the +// right version for your build configuration. +class ThreadCheckerDoNothing { + public: + bool CalledOnValidThread() const { + return true; + } + + void DetachFromThread() {} +}; + +// ThreadChecker is a helper class used to help verify that some methods of a +// class are called from the same thread. It provides identical functionality to +// base::NonThreadSafe, but it is meant to be held as a member variable, rather +// than inherited from base::NonThreadSafe. +// +// While inheriting from base::NonThreadSafe may give a clear indication about +// the thread-safety of a class, it may also lead to violations of the style +// guide with regard to multiple inheritence. The choice between having a +// ThreadChecker member and inheriting from base::NonThreadSafe should be based +// on whether: +// - Derived classes need to know the thread they belong to, as opposed to +// having that functionality fully encapsulated in the base class. +// - Derived classes should be able to reassign the base class to another +// thread, via DetachFromThread. +// +// If neither of these are true, then having a ThreadChecker member and calling +// CalledOnValidThread is the preferable solution. +// +// Example: +// class MyClass { +// public: +// void Foo() { +// DCHECK(thread_checker_.CalledOnValidThread()); +// ... (do stuff) ... +// } +// +// private: +// ThreadChecker thread_checker_; +// } +// +// In Release mode, CalledOnValidThread will always return true. +#if ENABLE_THREAD_CHECKER +class ThreadChecker : public ThreadCheckerImpl { +}; +#else +class ThreadChecker : public ThreadCheckerDoNothing { +}; +#endif // ENABLE_THREAD_CHECKER + +#undef ENABLE_THREAD_CHECKER + +} // namespace base + +#endif // BASE_THREADING_THREAD_CHECKER_H_ diff --git a/chromium/base/threading/thread_checker_impl.cc b/chromium/base/threading/thread_checker_impl.cc new file mode 100644 index 00000000000..985433e5f11 --- /dev/null +++ b/chromium/base/threading/thread_checker_impl.cc @@ -0,0 +1,34 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_checker_impl.h" + +namespace base { + +ThreadCheckerImpl::ThreadCheckerImpl() + : valid_thread_id_(kInvalidThreadId) { + EnsureThreadIdAssigned(); +} + +ThreadCheckerImpl::~ThreadCheckerImpl() {} + +bool ThreadCheckerImpl::CalledOnValidThread() const { + EnsureThreadIdAssigned(); + AutoLock auto_lock(lock_); + return valid_thread_id_ == PlatformThread::CurrentId(); +} + +void ThreadCheckerImpl::DetachFromThread() { + AutoLock auto_lock(lock_); + valid_thread_id_ = kInvalidThreadId; +} + +void ThreadCheckerImpl::EnsureThreadIdAssigned() const { + AutoLock auto_lock(lock_); + if (valid_thread_id_ != kInvalidThreadId) + return; + valid_thread_id_ = PlatformThread::CurrentId(); +} + +} // namespace base diff --git a/chromium/base/threading/thread_checker_impl.h b/chromium/base/threading/thread_checker_impl.h new file mode 100644 index 00000000000..24361c83221 --- /dev/null +++ b/chromium/base/threading/thread_checker_impl.h @@ -0,0 +1,43 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_THREAD_CHECKER_IMPL_H_ +#define BASE_THREADING_THREAD_CHECKER_IMPL_H_ + +#include "base/base_export.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" + +namespace base { + +// Real implementation of ThreadChecker, for use in debug mode, or +// for temporary use in release mode (e.g. to CHECK on a threading issue +// seen only in the wild). +// +// Note: You should almost always use the ThreadChecker class to get the +// right version for your build configuration. +class BASE_EXPORT ThreadCheckerImpl { + public: + ThreadCheckerImpl(); + ~ThreadCheckerImpl(); + + bool CalledOnValidThread() const; + + // Changes the thread that is checked for in CalledOnValidThread. This may + // be useful when an object may be created on one thread and then used + // exclusively on another thread. + void DetachFromThread(); + + private: + void EnsureThreadIdAssigned() const; + + mutable base::Lock lock_; + // This is mutable so that CalledOnValidThread can set it. + // It's guarded by |lock_|. + mutable PlatformThreadId valid_thread_id_; +}; + +} // namespace base + +#endif // BASE_THREADING_THREAD_CHECKER_IMPL_H_ diff --git a/chromium/base/threading/thread_checker_unittest.cc b/chromium/base/threading/thread_checker_unittest.cc new file mode 100644 index 00000000000..ae96923e5ca --- /dev/null +++ b/chromium/base/threading/thread_checker_unittest.cc @@ -0,0 +1,183 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/basictypes.h" +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/threading/thread_checker.h" +#include "base/threading/simple_thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +// Duplicated from base/threading/thread_checker.h so that we can be +// good citizens there and undef the macro. +#if !defined(NDEBUG) || defined(DCHECK_ALWAYS_ON) +#define ENABLE_THREAD_CHECKER 1 +#else +#define ENABLE_THREAD_CHECKER 0 +#endif + +namespace base { + +namespace { + +// Simple class to exercise the basics of ThreadChecker. +// Both the destructor and DoStuff should verify that they were +// called on the same thread as the constructor. +class ThreadCheckerClass : public ThreadChecker { + public: + ThreadCheckerClass() {} + + // Verifies that it was called on the same thread as the constructor. + void DoStuff() { + DCHECK(CalledOnValidThread()); + } + + void DetachFromThread() { + ThreadChecker::DetachFromThread(); + } + + static void MethodOnDifferentThreadImpl(); + static void DetachThenCallFromDifferentThreadImpl(); + + private: + DISALLOW_COPY_AND_ASSIGN(ThreadCheckerClass); +}; + +// Calls ThreadCheckerClass::DoStuff on another thread. +class CallDoStuffOnThread : public base::SimpleThread { + public: + explicit CallDoStuffOnThread(ThreadCheckerClass* thread_checker_class) + : SimpleThread("call_do_stuff_on_thread"), + thread_checker_class_(thread_checker_class) { + } + + virtual void Run() OVERRIDE { + thread_checker_class_->DoStuff(); + } + + private: + ThreadCheckerClass* thread_checker_class_; + + DISALLOW_COPY_AND_ASSIGN(CallDoStuffOnThread); +}; + +// Deletes ThreadCheckerClass on a different thread. +class DeleteThreadCheckerClassOnThread : public base::SimpleThread { + public: + explicit DeleteThreadCheckerClassOnThread( + ThreadCheckerClass* thread_checker_class) + : SimpleThread("delete_thread_checker_class_on_thread"), + thread_checker_class_(thread_checker_class) { + } + + virtual void Run() OVERRIDE { + thread_checker_class_.reset(); + } + + private: + scoped_ptr<ThreadCheckerClass> thread_checker_class_; + + DISALLOW_COPY_AND_ASSIGN(DeleteThreadCheckerClassOnThread); +}; + +} // namespace + +TEST(ThreadCheckerTest, CallsAllowedOnSameThread) { + scoped_ptr<ThreadCheckerClass> thread_checker_class( + new ThreadCheckerClass); + + // Verify that DoStuff doesn't assert. + thread_checker_class->DoStuff(); + + // Verify that the destructor doesn't assert. + thread_checker_class.reset(); +} + +TEST(ThreadCheckerTest, DestructorAllowedOnDifferentThread) { + scoped_ptr<ThreadCheckerClass> thread_checker_class( + new ThreadCheckerClass); + + // Verify that the destructor doesn't assert + // when called on a different thread. + DeleteThreadCheckerClassOnThread delete_on_thread( + thread_checker_class.release()); + + delete_on_thread.Start(); + delete_on_thread.Join(); +} + +TEST(ThreadCheckerTest, DetachFromThread) { + scoped_ptr<ThreadCheckerClass> thread_checker_class( + new ThreadCheckerClass); + + // Verify that DoStuff doesn't assert when called on a different thread after + // a call to DetachFromThread. + thread_checker_class->DetachFromThread(); + CallDoStuffOnThread call_on_thread(thread_checker_class.get()); + + call_on_thread.Start(); + call_on_thread.Join(); +} + +#if GTEST_HAS_DEATH_TEST || !ENABLE_THREAD_CHECKER + +void ThreadCheckerClass::MethodOnDifferentThreadImpl() { + scoped_ptr<ThreadCheckerClass> thread_checker_class( + new ThreadCheckerClass); + + // DoStuff should assert in debug builds only when called on a + // different thread. + CallDoStuffOnThread call_on_thread(thread_checker_class.get()); + + call_on_thread.Start(); + call_on_thread.Join(); +} + +#if ENABLE_THREAD_CHECKER +TEST(ThreadCheckerDeathTest, MethodNotAllowedOnDifferentThreadInDebug) { + ASSERT_DEATH({ + ThreadCheckerClass::MethodOnDifferentThreadImpl(); + }, ""); +} +#else +TEST(ThreadCheckerTest, MethodAllowedOnDifferentThreadInRelease) { + ThreadCheckerClass::MethodOnDifferentThreadImpl(); +} +#endif // ENABLE_THREAD_CHECKER + +void ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl() { + scoped_ptr<ThreadCheckerClass> thread_checker_class( + new ThreadCheckerClass); + + // DoStuff doesn't assert when called on a different thread + // after a call to DetachFromThread. + thread_checker_class->DetachFromThread(); + CallDoStuffOnThread call_on_thread(thread_checker_class.get()); + + call_on_thread.Start(); + call_on_thread.Join(); + + // DoStuff should assert in debug builds only after moving to + // another thread. + thread_checker_class->DoStuff(); +} + +#if ENABLE_THREAD_CHECKER +TEST(ThreadCheckerDeathTest, DetachFromThreadInDebug) { + ASSERT_DEATH({ + ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl(); + }, ""); +} +#else +TEST(ThreadCheckerTest, DetachFromThreadInRelease) { + ThreadCheckerClass::DetachThenCallFromDifferentThreadImpl(); +} +#endif // ENABLE_THREAD_CHECKER + +#endif // GTEST_HAS_DEATH_TEST || !ENABLE_THREAD_CHECKER + +// Just in case we ever get lumped together with other compilation units. +#undef ENABLE_THREAD_CHECKER + +} // namespace base diff --git a/chromium/base/threading/thread_collision_warner.cc b/chromium/base/threading/thread_collision_warner.cc new file mode 100644 index 00000000000..547e11ca66f --- /dev/null +++ b/chromium/base/threading/thread_collision_warner.cc @@ -0,0 +1,64 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_collision_warner.h" + +#include "base/logging.h" +#include "base/threading/platform_thread.h" + +namespace base { + +void DCheckAsserter::warn() { + NOTREACHED() << "Thread Collision"; +} + +static subtle::Atomic32 CurrentThread() { + const PlatformThreadId current_thread_id = PlatformThread::CurrentId(); + // We need to get the thread id into an atomic data type. This might be a + // truncating conversion, but any loss-of-information just increases the + // chance of a fault negative, not a false positive. + const subtle::Atomic32 atomic_thread_id = + static_cast<subtle::Atomic32>(current_thread_id); + + return atomic_thread_id; +} + +void ThreadCollisionWarner::EnterSelf() { + // If the active thread is 0 then I'll write the current thread ID + // if two or more threads arrive here only one will succeed to + // write on valid_thread_id_ the current thread ID. + subtle::Atomic32 current_thread_id = CurrentThread(); + + int previous_value = subtle::NoBarrier_CompareAndSwap(&valid_thread_id_, + 0, + current_thread_id); + if (previous_value != 0 && previous_value != current_thread_id) { + // gotcha! a thread is trying to use the same class and that is + // not current thread. + asserter_->warn(); + } + + subtle::NoBarrier_AtomicIncrement(&counter_, 1); +} + +void ThreadCollisionWarner::Enter() { + subtle::Atomic32 current_thread_id = CurrentThread(); + + if (subtle::NoBarrier_CompareAndSwap(&valid_thread_id_, + 0, + current_thread_id) != 0) { + // gotcha! another thread is trying to use the same class. + asserter_->warn(); + } + + subtle::NoBarrier_AtomicIncrement(&counter_, 1); +} + +void ThreadCollisionWarner::Leave() { + if (subtle::Barrier_AtomicIncrement(&counter_, -1) == 0) { + subtle::NoBarrier_Store(&valid_thread_id_, 0); + } +} + +} // namespace base diff --git a/chromium/base/threading/thread_collision_warner.h b/chromium/base/threading/thread_collision_warner.h new file mode 100644 index 00000000000..d509a58e8de --- /dev/null +++ b/chromium/base/threading/thread_collision_warner.h @@ -0,0 +1,244 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_THREAD_COLLISION_WARNER_H_ +#define BASE_THREADING_THREAD_COLLISION_WARNER_H_ + +#include <memory> + +#include "base/atomicops.h" +#include "base/base_export.h" +#include "base/compiler_specific.h" + +// A helper class alongside macros to be used to verify assumptions about thread +// safety of a class. +// +// Example: Queue implementation non thread-safe but still usable if clients +// are synchronized somehow. +// +// In this case the macro DFAKE_SCOPED_LOCK has to be +// used, it checks that if a thread is inside the push/pop then +// noone else is still inside the pop/push +// +// class NonThreadSafeQueue { +// public: +// ... +// void push(int) { DFAKE_SCOPED_LOCK(push_pop_); ... } +// int pop() { DFAKE_SCOPED_LOCK(push_pop_); ... } +// ... +// private: +// DFAKE_MUTEX(push_pop_); +// }; +// +// +// Example: Queue implementation non thread-safe but still usable if clients +// are synchronized somehow, it calls a method to "protect" from +// a "protected" method +// +// In this case the macro DFAKE_SCOPED_RECURSIVE_LOCK +// has to be used, it checks that if a thread is inside the push/pop +// then noone else is still inside the pop/push +// +// class NonThreadSafeQueue { +// public: +// void push(int) { +// DFAKE_SCOPED_LOCK(push_pop_); +// ... +// } +// int pop() { +// DFAKE_SCOPED_RECURSIVE_LOCK(push_pop_); +// bar(); +// ... +// } +// void bar() { DFAKE_SCOPED_RECURSIVE_LOCK(push_pop_); ... } +// ... +// private: +// DFAKE_MUTEX(push_pop_); +// }; +// +// +// Example: Queue implementation not usable even if clients are synchronized, +// so only one thread in the class life cycle can use the two members +// push/pop. +// +// In this case the macro DFAKE_SCOPED_LOCK_THREAD_LOCKED pins the +// specified +// critical section the first time a thread enters push or pop, from +// that time on only that thread is allowed to execute push or pop. +// +// class NonThreadSafeQueue { +// public: +// ... +// void push(int) { DFAKE_SCOPED_LOCK_THREAD_LOCKED(push_pop_); ... } +// int pop() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(push_pop_); ... } +// ... +// private: +// DFAKE_MUTEX(push_pop_); +// }; +// +// +// Example: Class that has to be contructed/destroyed on same thread, it has +// a "shareable" method (with external synchronization) and a not +// shareable method (even with external synchronization). +// +// In this case 3 Critical sections have to be defined +// +// class ExoticClass { +// public: +// ExoticClass() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); ... } +// ~ExoticClass() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); ... } +// +// void Shareable() { DFAKE_SCOPED_LOCK(shareable_section_); ... } +// void NotShareable() { DFAKE_SCOPED_LOCK_THREAD_LOCKED(ctor_dtor_); ... } +// ... +// private: +// DFAKE_MUTEX(ctor_dtor_); +// DFAKE_MUTEX(shareable_section_); +// }; + + +#if !defined(NDEBUG) + +// Defines a class member that acts like a mutex. It is used only as a +// verification tool. +#define DFAKE_MUTEX(obj) \ + mutable base::ThreadCollisionWarner obj +// Asserts the call is never called simultaneously in two threads. Used at +// member function scope. +#define DFAKE_SCOPED_LOCK(obj) \ + base::ThreadCollisionWarner::ScopedCheck s_check_##obj(&obj) +// Asserts the call is never called simultaneously in two threads. Used at +// member function scope. Same as DFAKE_SCOPED_LOCK but allows recursive locks. +#define DFAKE_SCOPED_RECURSIVE_LOCK(obj) \ + base::ThreadCollisionWarner::ScopedRecursiveCheck sr_check_##obj(&obj) +// Asserts the code is always executed in the same thread. +#define DFAKE_SCOPED_LOCK_THREAD_LOCKED(obj) \ + base::ThreadCollisionWarner::Check check_##obj(&obj) + +#else + +#define DFAKE_MUTEX(obj) typedef void InternalFakeMutexType##obj +#define DFAKE_SCOPED_LOCK(obj) ((void)0) +#define DFAKE_SCOPED_RECURSIVE_LOCK(obj) ((void)0) +#define DFAKE_SCOPED_LOCK_THREAD_LOCKED(obj) ((void)0) + +#endif + +namespace base { + +// The class ThreadCollisionWarner uses an Asserter to notify the collision +// AsserterBase is the interfaces and DCheckAsserter is the default asserter +// used. During the unit tests is used another class that doesn't "DCHECK" +// in case of collision (check thread_collision_warner_unittests.cc) +struct BASE_EXPORT AsserterBase { + virtual ~AsserterBase() {} + virtual void warn() = 0; +}; + +struct BASE_EXPORT DCheckAsserter : public AsserterBase { + virtual ~DCheckAsserter() {} + virtual void warn() OVERRIDE; +}; + +class BASE_EXPORT ThreadCollisionWarner { + public: + // The parameter asserter is there only for test purpose + explicit ThreadCollisionWarner(AsserterBase* asserter = new DCheckAsserter()) + : valid_thread_id_(0), + counter_(0), + asserter_(asserter) {} + + ~ThreadCollisionWarner() { + delete asserter_; + } + + // This class is meant to be used through the macro + // DFAKE_SCOPED_LOCK_THREAD_LOCKED + // it doesn't leave the critical section, as opposed to ScopedCheck, + // because the critical section being pinned is allowed to be used only + // from one thread + class BASE_EXPORT Check { + public: + explicit Check(ThreadCollisionWarner* warner) + : warner_(warner) { + warner_->EnterSelf(); + } + + ~Check() {} + + private: + ThreadCollisionWarner* warner_; + + DISALLOW_COPY_AND_ASSIGN(Check); + }; + + // This class is meant to be used through the macro + // DFAKE_SCOPED_LOCK + class BASE_EXPORT ScopedCheck { + public: + explicit ScopedCheck(ThreadCollisionWarner* warner) + : warner_(warner) { + warner_->Enter(); + } + + ~ScopedCheck() { + warner_->Leave(); + } + + private: + ThreadCollisionWarner* warner_; + + DISALLOW_COPY_AND_ASSIGN(ScopedCheck); + }; + + // This class is meant to be used through the macro + // DFAKE_SCOPED_RECURSIVE_LOCK + class BASE_EXPORT ScopedRecursiveCheck { + public: + explicit ScopedRecursiveCheck(ThreadCollisionWarner* warner) + : warner_(warner) { + warner_->EnterSelf(); + } + + ~ScopedRecursiveCheck() { + warner_->Leave(); + } + + private: + ThreadCollisionWarner* warner_; + + DISALLOW_COPY_AND_ASSIGN(ScopedRecursiveCheck); + }; + + private: + // This method stores the current thread identifier and does a DCHECK + // if a another thread has already done it, it is safe if same thread + // calls this multiple time (recursion allowed). + void EnterSelf(); + + // Same as EnterSelf but recursion is not allowed. + void Enter(); + + // Removes the thread_id stored in order to allow other threads to + // call EnterSelf or Enter. + void Leave(); + + // This stores the thread id that is inside the critical section, if the + // value is 0 then no thread is inside. + volatile subtle::Atomic32 valid_thread_id_; + + // Counter to trace how many time a critical section was "pinned" + // (when allowed) in order to unpin it when counter_ reaches 0. + volatile subtle::Atomic32 counter_; + + // Here only for class unit tests purpose, during the test I need to not + // DCHECK but notify the collision with something else. + AsserterBase* asserter_; + + DISALLOW_COPY_AND_ASSIGN(ThreadCollisionWarner); +}; + +} // namespace base + +#endif // BASE_THREADING_THREAD_COLLISION_WARNER_H_ diff --git a/chromium/base/threading/thread_collision_warner_unittest.cc b/chromium/base/threading/thread_collision_warner_unittest.cc new file mode 100644 index 00000000000..48710a7f3cc --- /dev/null +++ b/chromium/base/threading/thread_collision_warner_unittest.cc @@ -0,0 +1,385 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/compiler_specific.h" +#include "base/memory/scoped_ptr.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" +#include "base/threading/simple_thread.h" +#include "base/threading/thread_collision_warner.h" +#include "testing/gtest/include/gtest/gtest.h" + +// '' : local class member function does not have a body +MSVC_PUSH_DISABLE_WARNING(4822) + + +#if defined(NDEBUG) + +// Would cause a memory leak otherwise. +#undef DFAKE_MUTEX +#define DFAKE_MUTEX(obj) scoped_ptr<base::AsserterBase> obj + +// In Release, we expect the AsserterBase::warn() to not happen. +#define EXPECT_NDEBUG_FALSE_DEBUG_TRUE EXPECT_FALSE + +#else + +// In Debug, we expect the AsserterBase::warn() to happen. +#define EXPECT_NDEBUG_FALSE_DEBUG_TRUE EXPECT_TRUE + +#endif + + +namespace { + +// This is the asserter used with ThreadCollisionWarner instead of the default +// DCheckAsserter. The method fail_state is used to know if a collision took +// place. +class AssertReporter : public base::AsserterBase { + public: + AssertReporter() + : failed_(false) {} + + virtual void warn() OVERRIDE { + failed_ = true; + } + + virtual ~AssertReporter() {} + + bool fail_state() const { return failed_; } + void reset() { failed_ = false; } + + private: + bool failed_; +}; + +} // namespace + +TEST(ThreadCollisionTest, BookCriticalSection) { + AssertReporter* local_reporter = new AssertReporter(); + + base::ThreadCollisionWarner warner(local_reporter); + EXPECT_FALSE(local_reporter->fail_state()); + + { // Pin section. + DFAKE_SCOPED_LOCK_THREAD_LOCKED(warner); + EXPECT_FALSE(local_reporter->fail_state()); + { // Pin section. + DFAKE_SCOPED_LOCK_THREAD_LOCKED(warner); + EXPECT_FALSE(local_reporter->fail_state()); + } + } +} + +TEST(ThreadCollisionTest, ScopedRecursiveBookCriticalSection) { + AssertReporter* local_reporter = new AssertReporter(); + + base::ThreadCollisionWarner warner(local_reporter); + EXPECT_FALSE(local_reporter->fail_state()); + + { // Pin section. + DFAKE_SCOPED_RECURSIVE_LOCK(warner); + EXPECT_FALSE(local_reporter->fail_state()); + { // Pin section again (allowed by DFAKE_SCOPED_RECURSIVE_LOCK) + DFAKE_SCOPED_RECURSIVE_LOCK(warner); + EXPECT_FALSE(local_reporter->fail_state()); + } // Unpin section. + } // Unpin section. + + // Check that section is not pinned + { // Pin section. + DFAKE_SCOPED_LOCK(warner); + EXPECT_FALSE(local_reporter->fail_state()); + } // Unpin section. +} + +TEST(ThreadCollisionTest, ScopedBookCriticalSection) { + AssertReporter* local_reporter = new AssertReporter(); + + base::ThreadCollisionWarner warner(local_reporter); + EXPECT_FALSE(local_reporter->fail_state()); + + { // Pin section. + DFAKE_SCOPED_LOCK(warner); + EXPECT_FALSE(local_reporter->fail_state()); + } // Unpin section. + + { // Pin section. + DFAKE_SCOPED_LOCK(warner); + EXPECT_FALSE(local_reporter->fail_state()); + { + // Pin section again (not allowed by DFAKE_SCOPED_LOCK) + DFAKE_SCOPED_LOCK(warner); + EXPECT_NDEBUG_FALSE_DEBUG_TRUE(local_reporter->fail_state()); + // Reset the status of warner for further tests. + local_reporter->reset(); + } // Unpin section. + } // Unpin section. + + { + // Pin section. + DFAKE_SCOPED_LOCK(warner); + EXPECT_FALSE(local_reporter->fail_state()); + } // Unpin section. +} + +TEST(ThreadCollisionTest, MTBookCriticalSectionTest) { + class NonThreadSafeQueue { + public: + explicit NonThreadSafeQueue(base::AsserterBase* asserter) + : push_pop_(asserter) { + } + + void push(int value) { + DFAKE_SCOPED_LOCK_THREAD_LOCKED(push_pop_); + } + + int pop() { + DFAKE_SCOPED_LOCK_THREAD_LOCKED(push_pop_); + return 0; + } + + private: + DFAKE_MUTEX(push_pop_); + + DISALLOW_COPY_AND_ASSIGN(NonThreadSafeQueue); + }; + + class QueueUser : public base::DelegateSimpleThread::Delegate { + public: + explicit QueueUser(NonThreadSafeQueue& queue) + : queue_(queue) {} + + virtual void Run() OVERRIDE { + queue_.push(0); + queue_.pop(); + } + + private: + NonThreadSafeQueue& queue_; + }; + + AssertReporter* local_reporter = new AssertReporter(); + + NonThreadSafeQueue queue(local_reporter); + + QueueUser queue_user_a(queue); + QueueUser queue_user_b(queue); + + base::DelegateSimpleThread thread_a(&queue_user_a, "queue_user_thread_a"); + base::DelegateSimpleThread thread_b(&queue_user_b, "queue_user_thread_b"); + + thread_a.Start(); + thread_b.Start(); + + thread_a.Join(); + thread_b.Join(); + + EXPECT_NDEBUG_FALSE_DEBUG_TRUE(local_reporter->fail_state()); +} + +TEST(ThreadCollisionTest, MTScopedBookCriticalSectionTest) { + // Queue with a 5 seconds push execution time, hopefuly the two used threads + // in the test will enter the push at same time. + class NonThreadSafeQueue { + public: + explicit NonThreadSafeQueue(base::AsserterBase* asserter) + : push_pop_(asserter) { + } + + void push(int value) { + DFAKE_SCOPED_LOCK(push_pop_); + base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(5)); + } + + int pop() { + DFAKE_SCOPED_LOCK(push_pop_); + return 0; + } + + private: + DFAKE_MUTEX(push_pop_); + + DISALLOW_COPY_AND_ASSIGN(NonThreadSafeQueue); + }; + + class QueueUser : public base::DelegateSimpleThread::Delegate { + public: + explicit QueueUser(NonThreadSafeQueue& queue) + : queue_(queue) {} + + virtual void Run() OVERRIDE { + queue_.push(0); + queue_.pop(); + } + + private: + NonThreadSafeQueue& queue_; + }; + + AssertReporter* local_reporter = new AssertReporter(); + + NonThreadSafeQueue queue(local_reporter); + + QueueUser queue_user_a(queue); + QueueUser queue_user_b(queue); + + base::DelegateSimpleThread thread_a(&queue_user_a, "queue_user_thread_a"); + base::DelegateSimpleThread thread_b(&queue_user_b, "queue_user_thread_b"); + + thread_a.Start(); + thread_b.Start(); + + thread_a.Join(); + thread_b.Join(); + + EXPECT_NDEBUG_FALSE_DEBUG_TRUE(local_reporter->fail_state()); +} + +TEST(ThreadCollisionTest, MTSynchedScopedBookCriticalSectionTest) { + // Queue with a 2 seconds push execution time, hopefuly the two used threads + // in the test will enter the push at same time. + class NonThreadSafeQueue { + public: + explicit NonThreadSafeQueue(base::AsserterBase* asserter) + : push_pop_(asserter) { + } + + void push(int value) { + DFAKE_SCOPED_LOCK(push_pop_); + base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(2)); + } + + int pop() { + DFAKE_SCOPED_LOCK(push_pop_); + return 0; + } + + private: + DFAKE_MUTEX(push_pop_); + + DISALLOW_COPY_AND_ASSIGN(NonThreadSafeQueue); + }; + + // This time the QueueUser class protects the non thread safe queue with + // a lock. + class QueueUser : public base::DelegateSimpleThread::Delegate { + public: + QueueUser(NonThreadSafeQueue& queue, base::Lock& lock) + : queue_(queue), + lock_(lock) {} + + virtual void Run() OVERRIDE { + { + base::AutoLock auto_lock(lock_); + queue_.push(0); + } + { + base::AutoLock auto_lock(lock_); + queue_.pop(); + } + } + private: + NonThreadSafeQueue& queue_; + base::Lock& lock_; + }; + + AssertReporter* local_reporter = new AssertReporter(); + + NonThreadSafeQueue queue(local_reporter); + + base::Lock lock; + + QueueUser queue_user_a(queue, lock); + QueueUser queue_user_b(queue, lock); + + base::DelegateSimpleThread thread_a(&queue_user_a, "queue_user_thread_a"); + base::DelegateSimpleThread thread_b(&queue_user_b, "queue_user_thread_b"); + + thread_a.Start(); + thread_b.Start(); + + thread_a.Join(); + thread_b.Join(); + + EXPECT_FALSE(local_reporter->fail_state()); +} + +TEST(ThreadCollisionTest, MTSynchedScopedRecursiveBookCriticalSectionTest) { + // Queue with a 2 seconds push execution time, hopefuly the two used threads + // in the test will enter the push at same time. + class NonThreadSafeQueue { + public: + explicit NonThreadSafeQueue(base::AsserterBase* asserter) + : push_pop_(asserter) { + } + + void push(int) { + DFAKE_SCOPED_RECURSIVE_LOCK(push_pop_); + bar(); + base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(2)); + } + + int pop() { + DFAKE_SCOPED_RECURSIVE_LOCK(push_pop_); + return 0; + } + + void bar() { + DFAKE_SCOPED_RECURSIVE_LOCK(push_pop_); + } + + private: + DFAKE_MUTEX(push_pop_); + + DISALLOW_COPY_AND_ASSIGN(NonThreadSafeQueue); + }; + + // This time the QueueUser class protects the non thread safe queue with + // a lock. + class QueueUser : public base::DelegateSimpleThread::Delegate { + public: + QueueUser(NonThreadSafeQueue& queue, base::Lock& lock) + : queue_(queue), + lock_(lock) {} + + virtual void Run() OVERRIDE { + { + base::AutoLock auto_lock(lock_); + queue_.push(0); + } + { + base::AutoLock auto_lock(lock_); + queue_.bar(); + } + { + base::AutoLock auto_lock(lock_); + queue_.pop(); + } + } + private: + NonThreadSafeQueue& queue_; + base::Lock& lock_; + }; + + AssertReporter* local_reporter = new AssertReporter(); + + NonThreadSafeQueue queue(local_reporter); + + base::Lock lock; + + QueueUser queue_user_a(queue, lock); + QueueUser queue_user_b(queue, lock); + + base::DelegateSimpleThread thread_a(&queue_user_a, "queue_user_thread_a"); + base::DelegateSimpleThread thread_b(&queue_user_b, "queue_user_thread_b"); + + thread_a.Start(); + thread_b.Start(); + + thread_a.Join(); + thread_b.Join(); + + EXPECT_FALSE(local_reporter->fail_state()); +} diff --git a/chromium/base/threading/thread_id_name_manager.cc b/chromium/base/threading/thread_id_name_manager.cc new file mode 100644 index 00000000000..ef08548f2ed --- /dev/null +++ b/chromium/base/threading/thread_id_name_manager.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_id_name_manager.h" + +#include <stdlib.h> +#include <string.h> + +#include "base/logging.h" +#include "base/memory/singleton.h" +#include "base/strings/string_util.h" + +namespace base { +namespace { + +static const char kDefaultName[] = ""; +static std::string* g_default_name; + +} + +ThreadIdNameManager::ThreadIdNameManager() + : main_process_id_(kInvalidThreadId) { + g_default_name = new std::string(kDefaultName); + + AutoLock locked(lock_); + name_to_interned_name_[kDefaultName] = g_default_name; +} + +ThreadIdNameManager::~ThreadIdNameManager() { +} + +ThreadIdNameManager* ThreadIdNameManager::GetInstance() { + return Singleton<ThreadIdNameManager, + LeakySingletonTraits<ThreadIdNameManager> >::get(); +} + +const char* ThreadIdNameManager::GetDefaultInternedString() { + return g_default_name->c_str(); +} + +void ThreadIdNameManager::RegisterThread(PlatformThreadHandle::Handle handle, + PlatformThreadId id) { + AutoLock locked(lock_); + thread_id_to_handle_[id] = handle; + thread_handle_to_interned_name_[handle] = + name_to_interned_name_[kDefaultName]; +} + +void ThreadIdNameManager::SetName(PlatformThreadId id, const char* name) { + std::string str_name(name); + + AutoLock locked(lock_); + NameToInternedNameMap::iterator iter = name_to_interned_name_.find(str_name); + std::string* leaked_str = NULL; + if (iter != name_to_interned_name_.end()) { + leaked_str = iter->second; + } else { + leaked_str = new std::string(str_name); + name_to_interned_name_[str_name] = leaked_str; + } + + ThreadIdToHandleMap::iterator id_to_handle_iter = + thread_id_to_handle_.find(id); + + // The main thread of a process will not be created as a Thread object which + // means there is no PlatformThreadHandler registered. + if (id_to_handle_iter == thread_id_to_handle_.end()) { + main_process_name_ = leaked_str; + main_process_id_ = id; + return; + } + thread_handle_to_interned_name_[id_to_handle_iter->second] = leaked_str; +} + +const char* ThreadIdNameManager::GetName(PlatformThreadId id) { + AutoLock locked(lock_); + + if (id == main_process_id_) + return main_process_name_->c_str(); + + ThreadIdToHandleMap::iterator id_to_handle_iter = + thread_id_to_handle_.find(id); + if (id_to_handle_iter == thread_id_to_handle_.end()) + return name_to_interned_name_[kDefaultName]->c_str(); + + ThreadHandleToInternedNameMap::iterator handle_to_name_iter = + thread_handle_to_interned_name_.find(id_to_handle_iter->second); + return handle_to_name_iter->second->c_str(); +} + +void ThreadIdNameManager::RemoveName(PlatformThreadHandle::Handle handle, + PlatformThreadId id) { + AutoLock locked(lock_); + ThreadHandleToInternedNameMap::iterator handle_to_name_iter = + thread_handle_to_interned_name_.find(handle); + + DCHECK(handle_to_name_iter != thread_handle_to_interned_name_.end()); + thread_handle_to_interned_name_.erase(handle_to_name_iter); + + ThreadIdToHandleMap::iterator id_to_handle_iter = + thread_id_to_handle_.find(id); + DCHECK((id_to_handle_iter!= thread_id_to_handle_.end())); + // The given |id| may have been re-used by the system. Make sure the + // mapping points to the provided |handle| before removal. + if (id_to_handle_iter->second != handle) + return; + + thread_id_to_handle_.erase(id_to_handle_iter); +} + +} // namespace base diff --git a/chromium/base/threading/thread_id_name_manager.h b/chromium/base/threading/thread_id_name_manager.h new file mode 100644 index 00000000000..0ea59df6572 --- /dev/null +++ b/chromium/base/threading/thread_id_name_manager.h @@ -0,0 +1,67 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_THREAD_ID_NAME_MANAGER_H_ +#define BASE_THREADING_THREAD_ID_NAME_MANAGER_H_ + +#include <map> +#include <string> + +#include "base/base_export.h" +#include "base/basictypes.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" + +template <typename T> struct DefaultSingletonTraits; + +namespace base { + +class BASE_EXPORT ThreadIdNameManager { + public: + static ThreadIdNameManager* GetInstance(); + + static const char* GetDefaultInternedString(); + + // Register the mapping between a thread |id| and |handle|. + void RegisterThread(PlatformThreadHandle::Handle handle, PlatformThreadId id); + + // Set the name for the given id. + void SetName(PlatformThreadId id, const char* name); + + // Get the name for the given id. + const char* GetName(PlatformThreadId id); + + // Remove the name for the given id. + void RemoveName(PlatformThreadHandle::Handle handle, PlatformThreadId id); + + private: + friend struct DefaultSingletonTraits<ThreadIdNameManager>; + + typedef std::map<PlatformThreadId, PlatformThreadHandle::Handle> + ThreadIdToHandleMap; + typedef std::map<PlatformThreadHandle::Handle, std::string*> + ThreadHandleToInternedNameMap; + typedef std::map<std::string, std::string*> NameToInternedNameMap; + + ThreadIdNameManager(); + ~ThreadIdNameManager(); + + // lock_ protects the name_to_interned_name_, thread_id_to_handle_ and + // thread_handle_to_interned_name_ maps. + Lock lock_; + + NameToInternedNameMap name_to_interned_name_; + ThreadIdToHandleMap thread_id_to_handle_; + ThreadHandleToInternedNameMap thread_handle_to_interned_name_; + + // Treat the main process specially as there is no PlatformThreadHandle. + std::string* main_process_name_; + PlatformThreadId main_process_id_; + + DISALLOW_COPY_AND_ASSIGN(ThreadIdNameManager); +}; + +} // namespace base + +#endif // BASE_THREADING_THREAD_ID_NAME_MANAGER_H_ diff --git a/chromium/base/threading/thread_id_name_manager_unittest.cc b/chromium/base/threading/thread_id_name_manager_unittest.cc new file mode 100644 index 00000000000..37773018a0e --- /dev/null +++ b/chromium/base/threading/thread_id_name_manager_unittest.cc @@ -0,0 +1,93 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_id_name_manager.h" + +#include "base/threading/platform_thread.h" +#include "base/threading/thread.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +typedef PlatformTest ThreadIdNameManagerTest; + +namespace { + +static const char* kAThread = "a thread"; +static const char* kBThread = "b thread"; + +TEST_F(ThreadIdNameManagerTest, AddThreads) { + base::ThreadIdNameManager* manager = base::ThreadIdNameManager::GetInstance(); + base::Thread thread_a(kAThread); + base::Thread thread_b(kBThread); + + thread_a.Start(); + thread_b.Start(); + + EXPECT_STREQ(kAThread, manager->GetName(thread_a.thread_id())); + EXPECT_STREQ(kBThread, manager->GetName(thread_b.thread_id())); + + thread_b.Stop(); + thread_a.Stop(); +} + +TEST_F(ThreadIdNameManagerTest, RemoveThreads) { + base::ThreadIdNameManager* manager = base::ThreadIdNameManager::GetInstance(); + base::Thread thread_a(kAThread); + + thread_a.Start(); + { + base::Thread thread_b(kBThread); + thread_b.Start(); + thread_b.Stop(); + } + EXPECT_STREQ(kAThread, manager->GetName(thread_a.thread_id())); + + thread_a.Stop(); + EXPECT_STREQ("", manager->GetName(thread_a.thread_id())); +} + +TEST_F(ThreadIdNameManagerTest, RestartThread) { + base::ThreadIdNameManager* manager = base::ThreadIdNameManager::GetInstance(); + base::Thread thread_a(kAThread); + + thread_a.Start(); + base::PlatformThreadId a_id = thread_a.thread_id(); + EXPECT_STREQ(kAThread, manager->GetName(a_id)); + thread_a.Stop(); + + thread_a.Start(); + EXPECT_STREQ("", manager->GetName(a_id)); + EXPECT_STREQ(kAThread, manager->GetName(thread_a.thread_id())); + thread_a.Stop(); +} + +TEST_F(ThreadIdNameManagerTest, ThreadNameInterning) { + base::ThreadIdNameManager* manager = base::ThreadIdNameManager::GetInstance(); + + base::PlatformThreadId a_id = base::PlatformThread::CurrentId(); + base::PlatformThread::SetName("First Name"); + std::string version = manager->GetName(a_id); + + base::PlatformThread::SetName("New name"); + EXPECT_NE(version, manager->GetName(a_id)); + base::PlatformThread::SetName(""); +} + +TEST_F(ThreadIdNameManagerTest, ResettingNameKeepsCorrectInternedValue) { + base::ThreadIdNameManager* manager = base::ThreadIdNameManager::GetInstance(); + + base::PlatformThreadId a_id = base::PlatformThread::CurrentId(); + base::PlatformThread::SetName("Test Name"); + std::string version = manager->GetName(a_id); + + base::PlatformThread::SetName("New name"); + EXPECT_NE(version, manager->GetName(a_id)); + + base::PlatformThread::SetName("Test Name"); + EXPECT_EQ(version, manager->GetName(a_id)); + + base::PlatformThread::SetName(""); +} + +} // namespace diff --git a/chromium/base/threading/thread_local.h b/chromium/base/threading/thread_local.h new file mode 100644 index 00000000000..6561420758c --- /dev/null +++ b/chromium/base/threading/thread_local.h @@ -0,0 +1,128 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// WARNING: Thread local storage is a bit tricky to get right. Please make +// sure that this is really the proper solution for what you're trying to +// achieve. Don't prematurely optimize, most likely you can just use a Lock. +// +// These classes implement a wrapper around the platform's TLS storage +// mechanism. On construction, they will allocate a TLS slot, and free the +// TLS slot on destruction. No memory management (creation or destruction) is +// handled. This means for uses of ThreadLocalPointer, you must correctly +// manage the memory yourself, these classes will not destroy the pointer for +// you. There are no at-thread-exit actions taken by these classes. +// +// ThreadLocalPointer<Type> wraps a Type*. It performs no creation or +// destruction, so memory management must be handled elsewhere. The first call +// to Get() on a thread will return NULL. You can update the pointer with a +// call to Set(). +// +// ThreadLocalBoolean wraps a bool. It will default to false if it has never +// been set otherwise with Set(). +// +// Thread Safety: An instance of ThreadLocalStorage is completely thread safe +// once it has been created. If you want to dynamically create an instance, +// you must of course properly deal with safety and race conditions. This +// means a function-level static initializer is generally inappropiate. +// +// Example usage: +// // My class is logically attached to a single thread. We cache a pointer +// // on the thread it was created on, so we can implement current(). +// MyClass::MyClass() { +// DCHECK(Singleton<ThreadLocalPointer<MyClass> >::get()->Get() == NULL); +// Singleton<ThreadLocalPointer<MyClass> >::get()->Set(this); +// } +// +// MyClass::~MyClass() { +// DCHECK(Singleton<ThreadLocalPointer<MyClass> >::get()->Get() != NULL); +// Singleton<ThreadLocalPointer<MyClass> >::get()->Set(NULL); +// } +// +// // Return the current MyClass associated with the calling thread, can be +// // NULL if there isn't a MyClass associated. +// MyClass* MyClass::current() { +// return Singleton<ThreadLocalPointer<MyClass> >::get()->Get(); +// } + +#ifndef BASE_THREADING_THREAD_LOCAL_H_ +#define BASE_THREADING_THREAD_LOCAL_H_ + +#include "base/base_export.h" +#include "base/basictypes.h" + +#if defined(OS_POSIX) +#include <pthread.h> +#endif + +namespace base { + +namespace internal { + +// Helper functions that abstract the cross-platform APIs. Do not use directly. +struct BASE_EXPORT ThreadLocalPlatform { +#if defined(OS_WIN) + typedef unsigned long SlotType; +#elif defined(OS_POSIX) + typedef pthread_key_t SlotType; +#endif + + static void AllocateSlot(SlotType& slot); + static void FreeSlot(SlotType& slot); + static void* GetValueFromSlot(SlotType& slot); + static void SetValueInSlot(SlotType& slot, void* value); +}; + +} // namespace internal + +template <typename Type> +class ThreadLocalPointer { + public: + ThreadLocalPointer() : slot_() { + internal::ThreadLocalPlatform::AllocateSlot(slot_); + } + + ~ThreadLocalPointer() { + internal::ThreadLocalPlatform::FreeSlot(slot_); + } + + Type* Get() { + return static_cast<Type*>( + internal::ThreadLocalPlatform::GetValueFromSlot(slot_)); + } + + void Set(Type* ptr) { + internal::ThreadLocalPlatform::SetValueInSlot( + slot_, const_cast<void*>(static_cast<const void*>(ptr))); + } + + private: + typedef internal::ThreadLocalPlatform::SlotType SlotType; + + SlotType slot_; + + DISALLOW_COPY_AND_ASSIGN(ThreadLocalPointer<Type>); +}; + +class ThreadLocalBoolean { + public: + ThreadLocalBoolean() { } + ~ThreadLocalBoolean() { } + + bool Get() { + return tlp_.Get() != NULL; + } + + void Set(bool val) { + tlp_.Set(val ? this : NULL); + } + + private: + ThreadLocalPointer<void> tlp_; + + DISALLOW_COPY_AND_ASSIGN(ThreadLocalBoolean); +}; + +} // namespace base + +#endif // BASE_THREADING_THREAD_LOCAL_H_ diff --git a/chromium/base/threading/thread_local_posix.cc b/chromium/base/threading/thread_local_posix.cc new file mode 100644 index 00000000000..49510064e75 --- /dev/null +++ b/chromium/base/threading/thread_local_posix.cc @@ -0,0 +1,40 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_local.h" + +#include <pthread.h> + +#include "base/logging.h" + +namespace base { + +namespace internal { + +// static +void ThreadLocalPlatform::AllocateSlot(SlotType& slot) { + int error = pthread_key_create(&slot, NULL); + CHECK_EQ(error, 0); +} + +// static +void ThreadLocalPlatform::FreeSlot(SlotType& slot) { + int error = pthread_key_delete(slot); + DCHECK_EQ(0, error); +} + +// static +void* ThreadLocalPlatform::GetValueFromSlot(SlotType& slot) { + return pthread_getspecific(slot); +} + +// static +void ThreadLocalPlatform::SetValueInSlot(SlotType& slot, void* value) { + int error = pthread_setspecific(slot, value); + DCHECK_EQ(error, 0); +} + +} // namespace internal + +} // namespace base diff --git a/chromium/base/threading/thread_local_storage.h b/chromium/base/threading/thread_local_storage.h new file mode 100644 index 00000000000..eb5648f76c9 --- /dev/null +++ b/chromium/base/threading/thread_local_storage.h @@ -0,0 +1,93 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_THREAD_LOCAL_STORAGE_H_ +#define BASE_THREADING_THREAD_LOCAL_STORAGE_H_ + +#include "base/base_export.h" +#include "base/basictypes.h" + +#if defined(OS_POSIX) +#include <pthread.h> +#endif + +namespace base { + +// Wrapper for thread local storage. This class doesn't do much except provide +// an API for portability. +class BASE_EXPORT ThreadLocalStorage { + public: + + // Prototype for the TLS destructor function, which can be optionally used to + // cleanup thread local storage on thread exit. 'value' is the data that is + // stored in thread local storage. + typedef void (*TLSDestructorFunc)(void* value); + + // StaticSlot uses its own struct initializer-list style static + // initialization, as base's LINKER_INITIALIZED requires a constructor and on + // some compilers (notably gcc 4.4) this still ends up needing runtime + // initialization. + #define TLS_INITIALIZER {0} + + // A key representing one value stored in TLS. + // Initialize like + // ThreadLocalStorage::StaticSlot my_slot = TLS_INITIALIZER; + // If you're not using a static variable, use the convenience class + // ThreadLocalStorage::Slot (below) instead. + struct BASE_EXPORT StaticSlot { + // Set up the TLS slot. Called by the constructor. + // 'destructor' is a pointer to a function to perform per-thread cleanup of + // this object. If set to NULL, no cleanup is done for this TLS slot. + // Returns false on error. + bool Initialize(TLSDestructorFunc destructor); + + // Free a previously allocated TLS 'slot'. + // If a destructor was set for this slot, removes + // the destructor so that remaining threads exiting + // will not free data. + void Free(); + + // Get the thread-local value stored in slot 'slot'. + // Values are guaranteed to initially be zero. + void* Get() const; + + // Set the thread-local value stored in slot 'slot' to + // value 'value'. + void Set(void* value); + + bool initialized() const { return initialized_; } + + // The internals of this struct should be considered private. + bool initialized_; +#if defined(OS_WIN) + int slot_; +#elif defined(OS_POSIX) + pthread_key_t key_; +#endif + + }; + + // A convenience wrapper around StaticSlot with a constructor. Can be used + // as a member variable. + class BASE_EXPORT Slot : public StaticSlot { + public: + // Calls StaticSlot::Initialize(). + explicit Slot(TLSDestructorFunc destructor = NULL); + + private: + using StaticSlot::initialized_; +#if defined(OS_WIN) + using StaticSlot::slot_; +#elif defined(OS_POSIX) + using StaticSlot::key_; +#endif + DISALLOW_COPY_AND_ASSIGN(Slot); + }; + + DISALLOW_COPY_AND_ASSIGN(ThreadLocalStorage); +}; + +} // namespace base + +#endif // BASE_THREADING_THREAD_LOCAL_STORAGE_H_ diff --git a/chromium/base/threading/thread_local_storage_posix.cc b/chromium/base/threading/thread_local_storage_posix.cc new file mode 100644 index 00000000000..75da5a7d8f0 --- /dev/null +++ b/chromium/base/threading/thread_local_storage_posix.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_local_storage.h" + +#include "base/logging.h" + +namespace base { + +ThreadLocalStorage::Slot::Slot(TLSDestructorFunc destructor) { + initialized_ = false; + key_ = 0; + Initialize(destructor); +} + +bool ThreadLocalStorage::StaticSlot::Initialize(TLSDestructorFunc destructor) { + DCHECK(!initialized_); + int error = pthread_key_create(&key_, destructor); + if (error) { + NOTREACHED(); + return false; + } + + initialized_ = true; + return true; +} + +void ThreadLocalStorage::StaticSlot::Free() { + DCHECK(initialized_); + int error = pthread_key_delete(key_); + if (error) + NOTREACHED(); + initialized_ = false; +} + +void* ThreadLocalStorage::StaticSlot::Get() const { + DCHECK(initialized_); + return pthread_getspecific(key_); +} + +void ThreadLocalStorage::StaticSlot::Set(void* value) { + DCHECK(initialized_); + int error = pthread_setspecific(key_, value); + if (error) + NOTREACHED(); +} + +} // namespace base diff --git a/chromium/base/threading/thread_local_storage_unittest.cc b/chromium/base/threading/thread_local_storage_unittest.cc new file mode 100644 index 00000000000..e295c89426d --- /dev/null +++ b/chromium/base/threading/thread_local_storage_unittest.cc @@ -0,0 +1,124 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#if defined(OS_WIN) +#include <windows.h> +#include <process.h> +#endif + +#include "base/threading/simple_thread.h" +#include "base/threading/thread_local_storage.h" +#include "testing/gtest/include/gtest/gtest.h" + +#if defined(OS_WIN) +// Ignore warnings about ptr->int conversions that we use when +// storing ints into ThreadLocalStorage. +#pragma warning(disable : 4311 4312) +#endif + +namespace base { + +namespace { + +const int kInitialTlsValue = 0x5555; +const int kFinalTlsValue = 0x7777; +// How many times must a destructor be called before we really are done. +const int kNumberDestructorCallRepetitions = 3; + +static ThreadLocalStorage::StaticSlot tls_slot = TLS_INITIALIZER; + +class ThreadLocalStorageRunner : public DelegateSimpleThread::Delegate { + public: + explicit ThreadLocalStorageRunner(int* tls_value_ptr) + : tls_value_ptr_(tls_value_ptr) {} + + virtual ~ThreadLocalStorageRunner() {} + + virtual void Run() OVERRIDE { + *tls_value_ptr_ = kInitialTlsValue; + tls_slot.Set(tls_value_ptr_); + + int *ptr = static_cast<int*>(tls_slot.Get()); + EXPECT_EQ(ptr, tls_value_ptr_); + EXPECT_EQ(*ptr, kInitialTlsValue); + *tls_value_ptr_ = 0; + + ptr = static_cast<int*>(tls_slot.Get()); + EXPECT_EQ(ptr, tls_value_ptr_); + EXPECT_EQ(*ptr, 0); + + *ptr = kFinalTlsValue + kNumberDestructorCallRepetitions; + } + + private: + int* tls_value_ptr_; + DISALLOW_COPY_AND_ASSIGN(ThreadLocalStorageRunner); +}; + + +void ThreadLocalStorageCleanup(void *value) { + int *ptr = reinterpret_cast<int*>(value); + // Destructors should never be called with a NULL. + ASSERT_NE(reinterpret_cast<int*>(NULL), ptr); + if (*ptr == kFinalTlsValue) + return; // We've been called enough times. + ASSERT_LT(kFinalTlsValue, *ptr); + ASSERT_GE(kFinalTlsValue + kNumberDestructorCallRepetitions, *ptr); + --*ptr; // Move closer to our target. + // Tell tls that we're not done with this thread, and still need destruction. + tls_slot.Set(value); +} + +} // namespace + +TEST(ThreadLocalStorageTest, Basics) { + ThreadLocalStorage::Slot slot; + slot.Set(reinterpret_cast<void*>(123)); + int value = reinterpret_cast<intptr_t>(slot.Get()); + EXPECT_EQ(value, 123); +} + +#if defined(THREAD_SANITIZER) +// Do not run the test under ThreadSanitizer. Because this test iterates its +// own TSD destructor for the maximum possible number of times, TSan can't jump +// in after the last destructor invocation, therefore the destructor remains +// unsynchronized with the following users of the same TSD slot. This results +// in race reports between the destructor and functions in other tests. +#define MAYBE_TLSDestructors DISABLED_TLSDestructors +#else +#define MAYBE_TLSDestructors TLSDestructors +#endif +TEST(ThreadLocalStorageTest, MAYBE_TLSDestructors) { + // Create a TLS index with a destructor. Create a set of + // threads that set the TLS, while the destructor cleans it up. + // After the threads finish, verify that the value is cleaned up. + const int kNumThreads = 5; + int values[kNumThreads]; + ThreadLocalStorageRunner* thread_delegates[kNumThreads]; + DelegateSimpleThread* threads[kNumThreads]; + + tls_slot.Initialize(ThreadLocalStorageCleanup); + + // Spawn the threads. + for (int index = 0; index < kNumThreads; index++) { + values[index] = kInitialTlsValue; + thread_delegates[index] = new ThreadLocalStorageRunner(&values[index]); + threads[index] = new DelegateSimpleThread(thread_delegates[index], + "tls thread"); + threads[index]->Start(); + } + + // Wait for the threads to finish. + for (int index = 0; index < kNumThreads; index++) { + threads[index]->Join(); + delete threads[index]; + delete thread_delegates[index]; + + // Verify that the destructor was called and that we reset. + EXPECT_EQ(values[index], kFinalTlsValue); + } + tls_slot.Free(); // Stop doing callbacks to cleanup threads. +} + +} // namespace base diff --git a/chromium/base/threading/thread_local_storage_win.cc b/chromium/base/threading/thread_local_storage_win.cc new file mode 100644 index 00000000000..0ae3cb4c8cd --- /dev/null +++ b/chromium/base/threading/thread_local_storage_win.cc @@ -0,0 +1,277 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_local_storage.h" + +#include <windows.h> + +#include "base/logging.h" + + +namespace { +// In order to make TLS destructors work, we need to keep function +// pointers to the destructor for each TLS that we allocate. +// We make this work by allocating a single OS-level TLS, which +// contains an array of slots for the application to use. In +// parallel, we also allocate an array of destructors, which we +// keep track of and call when threads terminate. + +// g_native_tls_key is the one native TLS that we use. It stores our table. +long g_native_tls_key = TLS_OUT_OF_INDEXES; + +// g_last_used_tls_key is the high-water-mark of allocated thread local storage. +// Each allocation is an index into our g_tls_destructors[]. Each such index is +// assigned to the instance variable slot_ in a ThreadLocalStorage::Slot +// instance. We reserve the value slot_ == 0 to indicate that the corresponding +// instance of ThreadLocalStorage::Slot has been freed (i.e., destructor called, +// etc.). This reserved use of 0 is then stated as the initial value of +// g_last_used_tls_key, so that the first issued index will be 1. +long g_last_used_tls_key = 0; + +// The maximum number of 'slots' in our thread local storage stack. +const int kThreadLocalStorageSize = 64; + +// The maximum number of times to try to clear slots by calling destructors. +// Use pthread naming convention for clarity. +const int kMaxDestructorIterations = kThreadLocalStorageSize; + +// An array of destructor function pointers for the slots. If a slot has a +// destructor, it will be stored in its corresponding entry in this array. +// The elements are volatile to ensure that when the compiler reads the value +// to potentially call the destructor, it does so once, and that value is tested +// for null-ness and then used. Yes, that would be a weird de-optimization, +// but I can imagine some register machines where it was just as easy to +// re-fetch an array element, and I want to be sure a call to free the key +// (i.e., null out the destructor entry) that happens on a separate thread can't +// hurt the racy calls to the destructors on another thread. +volatile base::ThreadLocalStorage::TLSDestructorFunc + g_tls_destructors[kThreadLocalStorageSize]; + +void** ConstructTlsVector() { + if (g_native_tls_key == TLS_OUT_OF_INDEXES) { + long value = TlsAlloc(); + DCHECK(value != TLS_OUT_OF_INDEXES); + + // Atomically test-and-set the tls_key. If the key is TLS_OUT_OF_INDEXES, + // go ahead and set it. Otherwise, do nothing, as another + // thread already did our dirty work. + if (TLS_OUT_OF_INDEXES != InterlockedCompareExchange( + &g_native_tls_key, value, TLS_OUT_OF_INDEXES)) { + // We've been shortcut. Another thread replaced g_native_tls_key first so + // we need to destroy our index and use the one the other thread got + // first. + TlsFree(value); + } + } + DCHECK(!TlsGetValue(g_native_tls_key)); + + // Some allocators, such as TCMalloc, make use of thread local storage. + // As a result, any attempt to call new (or malloc) will lazily cause such a + // system to initialize, which will include registering for a TLS key. If we + // are not careful here, then that request to create a key will call new back, + // and we'll have an infinite loop. We avoid that as follows: + // Use a stack allocated vector, so that we don't have dependence on our + // allocator until our service is in place. (i.e., don't even call new until + // after we're setup) + void* stack_allocated_tls_data[kThreadLocalStorageSize]; + memset(stack_allocated_tls_data, 0, sizeof(stack_allocated_tls_data)); + // Ensure that any rentrant calls change the temp version. + TlsSetValue(g_native_tls_key, stack_allocated_tls_data); + + // Allocate an array to store our data. + void** tls_data = new void*[kThreadLocalStorageSize]; + memcpy(tls_data, stack_allocated_tls_data, sizeof(stack_allocated_tls_data)); + TlsSetValue(g_native_tls_key, tls_data); + return tls_data; +} + +// Called when we terminate a thread, this function calls any TLS destructors +// that are pending for this thread. +void WinThreadExit() { + if (g_native_tls_key == TLS_OUT_OF_INDEXES) + return; + + void** tls_data = static_cast<void**>(TlsGetValue(g_native_tls_key)); + // Maybe we have never initialized TLS for this thread. + if (!tls_data) + return; + + // Some allocators, such as TCMalloc, use TLS. As a result, when a thread + // terminates, one of the destructor calls we make may be to shut down an + // allocator. We have to be careful that after we've shutdown all of the + // known destructors (perchance including an allocator), that we don't call + // the allocator and cause it to resurrect itself (with no possibly destructor + // call to follow). We handle this problem as follows: + // Switch to using a stack allocated vector, so that we don't have dependence + // on our allocator after we have called all g_tls_destructors. (i.e., don't + // even call delete[] after we're done with destructors.) + void* stack_allocated_tls_data[kThreadLocalStorageSize]; + memcpy(stack_allocated_tls_data, tls_data, sizeof(stack_allocated_tls_data)); + // Ensure that any re-entrant calls change the temp version. + TlsSetValue(g_native_tls_key, stack_allocated_tls_data); + delete[] tls_data; // Our last dependence on an allocator. + + int remaining_attempts = kMaxDestructorIterations; + bool need_to_scan_destructors = true; + while (need_to_scan_destructors) { + need_to_scan_destructors = false; + // Try to destroy the first-created-slot (which is slot 1) in our last + // destructor call. That user was able to function, and define a slot with + // no other services running, so perhaps it is a basic service (like an + // allocator) and should also be destroyed last. If we get the order wrong, + // then we'll itterate several more times, so it is really not that + // critical (but it might help). + for (int slot = g_last_used_tls_key; slot > 0; --slot) { + void* value = stack_allocated_tls_data[slot]; + if (value == NULL) + continue; + base::ThreadLocalStorage::TLSDestructorFunc destructor = + g_tls_destructors[slot]; + if (destructor == NULL) + continue; + stack_allocated_tls_data[slot] = NULL; // pre-clear the slot. + destructor(value); + // Any destructor might have called a different service, which then set + // a different slot to a non-NULL value. Hence we need to check + // the whole vector again. This is a pthread standard. + need_to_scan_destructors = true; + } + if (--remaining_attempts <= 0) { + NOTREACHED(); // Destructors might not have been called. + break; + } + } + + // Remove our stack allocated vector. + TlsSetValue(g_native_tls_key, NULL); +} + +} // namespace + +namespace base { + +ThreadLocalStorage::Slot::Slot(TLSDestructorFunc destructor) { + initialized_ = false; + slot_ = 0; + Initialize(destructor); +} + +bool ThreadLocalStorage::StaticSlot::Initialize(TLSDestructorFunc destructor) { + if (g_native_tls_key == TLS_OUT_OF_INDEXES || !TlsGetValue(g_native_tls_key)) + ConstructTlsVector(); + + // Grab a new slot. + slot_ = InterlockedIncrement(&g_last_used_tls_key); + DCHECK_GT(slot_, 0); + if (slot_ >= kThreadLocalStorageSize) { + NOTREACHED(); + return false; + } + + // Setup our destructor. + g_tls_destructors[slot_] = destructor; + initialized_ = true; + return true; +} + +void ThreadLocalStorage::StaticSlot::Free() { + // At this time, we don't reclaim old indices for TLS slots. + // So all we need to do is wipe the destructor. + DCHECK_GT(slot_, 0); + DCHECK_LT(slot_, kThreadLocalStorageSize); + g_tls_destructors[slot_] = NULL; + slot_ = 0; + initialized_ = false; +} + +void* ThreadLocalStorage::StaticSlot::Get() const { + void** tls_data = static_cast<void**>(TlsGetValue(g_native_tls_key)); + if (!tls_data) + tls_data = ConstructTlsVector(); + DCHECK_GT(slot_, 0); + DCHECK_LT(slot_, kThreadLocalStorageSize); + return tls_data[slot_]; +} + +void ThreadLocalStorage::StaticSlot::Set(void* value) { + void** tls_data = static_cast<void**>(TlsGetValue(g_native_tls_key)); + if (!tls_data) + tls_data = ConstructTlsVector(); + DCHECK_GT(slot_, 0); + DCHECK_LT(slot_, kThreadLocalStorageSize); + tls_data[slot_] = value; +} + +} // namespace base + +// Thread Termination Callbacks. +// Windows doesn't support a per-thread destructor with its +// TLS primitives. So, we build it manually by inserting a +// function to be called on each thread's exit. +// This magic is from http://www.codeproject.com/threads/tls.asp +// and it works for VC++ 7.0 and later. + +// Force a reference to _tls_used to make the linker create the TLS directory +// if it's not already there. (e.g. if __declspec(thread) is not used). +// Force a reference to p_thread_callback_base to prevent whole program +// optimization from discarding the variable. +#ifdef _WIN64 + +#pragma comment(linker, "/INCLUDE:_tls_used") +#pragma comment(linker, "/INCLUDE:p_thread_callback_base") + +#else // _WIN64 + +#pragma comment(linker, "/INCLUDE:__tls_used") +#pragma comment(linker, "/INCLUDE:_p_thread_callback_base") + +#endif // _WIN64 + +// Static callback function to call with each thread termination. +void NTAPI OnThreadExit(PVOID module, DWORD reason, PVOID reserved) { + // On XP SP0 & SP1, the DLL_PROCESS_ATTACH is never seen. It is sent on SP2+ + // and on W2K and W2K3. So don't assume it is sent. + if (DLL_THREAD_DETACH == reason || DLL_PROCESS_DETACH == reason) + WinThreadExit(); +} + +// .CRT$XLA to .CRT$XLZ is an array of PIMAGE_TLS_CALLBACK pointers that are +// called automatically by the OS loader code (not the CRT) when the module is +// loaded and on thread creation. They are NOT called if the module has been +// loaded by a LoadLibrary() call. It must have implicitly been loaded at +// process startup. +// By implicitly loaded, I mean that it is directly referenced by the main EXE +// or by one of its dependent DLLs. Delay-loaded DLL doesn't count as being +// implicitly loaded. +// +// See VC\crt\src\tlssup.c for reference. + +// extern "C" suppresses C++ name mangling so we know the symbol name for the +// linker /INCLUDE:symbol pragma above. +extern "C" { +// The linker must not discard p_thread_callback_base. (We force a reference +// to this variable with a linker /INCLUDE:symbol pragma to ensure that.) If +// this variable is discarded, the OnThreadExit function will never be called. +#ifdef _WIN64 + +// .CRT section is merged with .rdata on x64 so it must be constant data. +#pragma const_seg(".CRT$XLB") +// When defining a const variable, it must have external linkage to be sure the +// linker doesn't discard it. +extern const PIMAGE_TLS_CALLBACK p_thread_callback_base; +const PIMAGE_TLS_CALLBACK p_thread_callback_base = OnThreadExit; + +// Reset the default section. +#pragma const_seg() + +#else // _WIN64 + +#pragma data_seg(".CRT$XLB") +PIMAGE_TLS_CALLBACK p_thread_callback_base = OnThreadExit; + +// Reset the default section. +#pragma data_seg() + +#endif // _WIN64 +} // extern "C" diff --git a/chromium/base/threading/thread_local_unittest.cc b/chromium/base/threading/thread_local_unittest.cc new file mode 100644 index 00000000000..b125a484063 --- /dev/null +++ b/chromium/base/threading/thread_local_unittest.cc @@ -0,0 +1,169 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/logging.h" +#include "base/threading/simple_thread.h" +#include "base/threading/thread_local.h" +#include "base/synchronization/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +class ThreadLocalTesterBase : public base::DelegateSimpleThreadPool::Delegate { + public: + typedef base::ThreadLocalPointer<ThreadLocalTesterBase> TLPType; + + ThreadLocalTesterBase(TLPType* tlp, base::WaitableEvent* done) + : tlp_(tlp), + done_(done) { + } + virtual ~ThreadLocalTesterBase() {} + + protected: + TLPType* tlp_; + base::WaitableEvent* done_; +}; + +class SetThreadLocal : public ThreadLocalTesterBase { + public: + SetThreadLocal(TLPType* tlp, base::WaitableEvent* done) + : ThreadLocalTesterBase(tlp, done), + val_(NULL) { + } + virtual ~SetThreadLocal() {} + + void set_value(ThreadLocalTesterBase* val) { val_ = val; } + + virtual void Run() OVERRIDE { + DCHECK(!done_->IsSignaled()); + tlp_->Set(val_); + done_->Signal(); + } + + private: + ThreadLocalTesterBase* val_; +}; + +class GetThreadLocal : public ThreadLocalTesterBase { + public: + GetThreadLocal(TLPType* tlp, base::WaitableEvent* done) + : ThreadLocalTesterBase(tlp, done), + ptr_(NULL) { + } + virtual ~GetThreadLocal() {} + + void set_ptr(ThreadLocalTesterBase** ptr) { ptr_ = ptr; } + + virtual void Run() OVERRIDE { + DCHECK(!done_->IsSignaled()); + *ptr_ = tlp_->Get(); + done_->Signal(); + } + + private: + ThreadLocalTesterBase** ptr_; +}; + +} // namespace + +// In this test, we start 2 threads which will access a ThreadLocalPointer. We +// make sure the default is NULL, and the pointers are unique to the threads. +TEST(ThreadLocalTest, Pointer) { + base::DelegateSimpleThreadPool tp1("ThreadLocalTest tp1", 1); + base::DelegateSimpleThreadPool tp2("ThreadLocalTest tp1", 1); + tp1.Start(); + tp2.Start(); + + base::ThreadLocalPointer<ThreadLocalTesterBase> tlp; + + static ThreadLocalTesterBase* const kBogusPointer = + reinterpret_cast<ThreadLocalTesterBase*>(0x1234); + + ThreadLocalTesterBase* tls_val; + base::WaitableEvent done(true, false); + + GetThreadLocal getter(&tlp, &done); + getter.set_ptr(&tls_val); + + // Check that both threads defaulted to NULL. + tls_val = kBogusPointer; + done.Reset(); + tp1.AddWork(&getter); + done.Wait(); + EXPECT_EQ(static_cast<ThreadLocalTesterBase*>(NULL), tls_val); + + tls_val = kBogusPointer; + done.Reset(); + tp2.AddWork(&getter); + done.Wait(); + EXPECT_EQ(static_cast<ThreadLocalTesterBase*>(NULL), tls_val); + + + SetThreadLocal setter(&tlp, &done); + setter.set_value(kBogusPointer); + + // Have thread 1 set their pointer value to kBogusPointer. + done.Reset(); + tp1.AddWork(&setter); + done.Wait(); + + tls_val = NULL; + done.Reset(); + tp1.AddWork(&getter); + done.Wait(); + EXPECT_EQ(kBogusPointer, tls_val); + + // Make sure thread 2 is still NULL + tls_val = kBogusPointer; + done.Reset(); + tp2.AddWork(&getter); + done.Wait(); + EXPECT_EQ(static_cast<ThreadLocalTesterBase*>(NULL), tls_val); + + // Set thread 2 to kBogusPointer + 1. + setter.set_value(kBogusPointer + 1); + + done.Reset(); + tp2.AddWork(&setter); + done.Wait(); + + tls_val = NULL; + done.Reset(); + tp2.AddWork(&getter); + done.Wait(); + EXPECT_EQ(kBogusPointer + 1, tls_val); + + // Make sure thread 1 is still kBogusPointer. + tls_val = NULL; + done.Reset(); + tp1.AddWork(&getter); + done.Wait(); + EXPECT_EQ(kBogusPointer, tls_val); + + tp1.JoinAll(); + tp2.JoinAll(); +} + +TEST(ThreadLocalTest, Boolean) { + { + base::ThreadLocalBoolean tlb; + EXPECT_FALSE(tlb.Get()); + + tlb.Set(false); + EXPECT_FALSE(tlb.Get()); + + tlb.Set(true); + EXPECT_TRUE(tlb.Get()); + } + + // Our slot should have been freed, we're all reset. + { + base::ThreadLocalBoolean tlb; + EXPECT_FALSE(tlb.Get()); + } +} + +} // namespace base diff --git a/chromium/base/threading/thread_local_win.cc b/chromium/base/threading/thread_local_win.cc new file mode 100644 index 00000000000..56d3a3ac7f2 --- /dev/null +++ b/chromium/base/threading/thread_local_win.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_local.h" + +#include <windows.h> + +#include "base/logging.h" + +namespace base { + +namespace internal { + +// static +void ThreadLocalPlatform::AllocateSlot(SlotType& slot) { + slot = TlsAlloc(); + CHECK_NE(slot, TLS_OUT_OF_INDEXES); +} + +// static +void ThreadLocalPlatform::FreeSlot(SlotType& slot) { + if (!TlsFree(slot)) { + NOTREACHED() << "Failed to deallocate tls slot with TlsFree()."; + } +} + +// static +void* ThreadLocalPlatform::GetValueFromSlot(SlotType& slot) { + return TlsGetValue(slot); +} + +// static +void ThreadLocalPlatform::SetValueInSlot(SlotType& slot, void* value) { + if (!TlsSetValue(slot, value)) { + LOG(FATAL) << "Failed to TlsSetValue()."; + } +} + +} // namespace internal + +} // namespace base diff --git a/chromium/base/threading/thread_restrictions.cc b/chromium/base/threading/thread_restrictions.cc new file mode 100644 index 00000000000..871f2dc874c --- /dev/null +++ b/chromium/base/threading/thread_restrictions.cc @@ -0,0 +1,85 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread_restrictions.h" + +#if ENABLE_THREAD_RESTRICTIONS + +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/threading/thread_local.h" + +namespace base { + +namespace { + +LazyInstance<ThreadLocalBoolean>::Leaky + g_io_disallowed = LAZY_INSTANCE_INITIALIZER; + +LazyInstance<ThreadLocalBoolean>::Leaky + g_singleton_disallowed = LAZY_INSTANCE_INITIALIZER; + +LazyInstance<ThreadLocalBoolean>::Leaky + g_wait_disallowed = LAZY_INSTANCE_INITIALIZER; + +} // anonymous namespace + +// static +bool ThreadRestrictions::SetIOAllowed(bool allowed) { + bool previous_disallowed = g_io_disallowed.Get().Get(); + g_io_disallowed.Get().Set(!allowed); + return !previous_disallowed; +} + +// static +void ThreadRestrictions::AssertIOAllowed() { + if (g_io_disallowed.Get().Get()) { + LOG(FATAL) << + "Function marked as IO-only was called from a thread that " + "disallows IO! If this thread really should be allowed to " + "make IO calls, adjust the call to " + "base::ThreadRestrictions::SetIOAllowed() in this thread's " + "startup."; + } +} + +// static +bool ThreadRestrictions::SetSingletonAllowed(bool allowed) { + bool previous_disallowed = g_singleton_disallowed.Get().Get(); + g_singleton_disallowed.Get().Set(!allowed); + return !previous_disallowed; +} + +// static +void ThreadRestrictions::AssertSingletonAllowed() { + if (g_singleton_disallowed.Get().Get()) { + LOG(FATAL) << "LazyInstance/Singleton is not allowed to be used on this " + << "thread. Most likely it's because this thread is not " + << "joinable, so AtExitManager may have deleted the object " + << "on shutdown, leading to a potential shutdown crash."; + } +} + +// static +void ThreadRestrictions::DisallowWaiting() { + g_wait_disallowed.Get().Set(true); +} + +// static +void ThreadRestrictions::AssertWaitAllowed() { + if (g_wait_disallowed.Get().Get()) { + LOG(FATAL) << "Waiting is not allowed to be used on this thread to prevent" + << "jank and deadlock."; + } +} + +bool ThreadRestrictions::SetWaitAllowed(bool allowed) { + bool previous_disallowed = g_wait_disallowed.Get().Get(); + g_wait_disallowed.Get().Set(!allowed); + return !previous_disallowed; +} + +} // namespace base + +#endif // ENABLE_THREAD_RESTRICTIONS diff --git a/chromium/base/threading/thread_restrictions.h b/chromium/base/threading/thread_restrictions.h new file mode 100644 index 00000000000..595b97060e0 --- /dev/null +++ b/chromium/base/threading/thread_restrictions.h @@ -0,0 +1,247 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_THREAD_RESTRICTIONS_H_ +#define BASE_THREADING_THREAD_RESTRICTIONS_H_ + +#include "base/base_export.h" +#include "base/basictypes.h" + +// See comment at top of thread_checker.h +#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) +#define ENABLE_THREAD_RESTRICTIONS 1 +#else +#define ENABLE_THREAD_RESTRICTIONS 0 +#endif + +class AcceleratedPresenter; +class BrowserProcessImpl; +class HistogramSynchronizer; +class MetricsService; +class NativeBackendKWallet; +class ScopedAllowWaitForLegacyWebViewApi; +class TestingAutomationProvider; + +namespace browser_sync { +class NonFrontendDataTypeController; +class UIModelWorker; +} +namespace cc { +class CompletionEvent; +} +namespace chromeos { +class AudioMixerAlsa; +class BlockingMethodCaller; +namespace system { +class StatisticsProviderImpl; +} +} +namespace chrome_browser_net { +class Predictor; +} +namespace content { +class BrowserGpuChannelHostFactory; +class BrowserTestBase; +class GLHelper; +class GpuChannelHost; +class RenderWidgetHelper; +class ScopedAllowWaitForAndroidLayoutTests; +class TextInputClientMac; +} +namespace dbus { +class Bus; +} +namespace disk_cache { +class BackendImpl; +class InFlightIO; +} +namespace media { +class AudioOutputController; +} +namespace net { +class FileStreamPosix; +class FileStreamWin; +namespace internal { +class AddressTrackerLinux; +} +} + +namespace remoting { +class AutoThread; +} + +namespace base { + +namespace android { +class JavaHandlerThread; +} + +class SequencedWorkerPool; +class SimpleThread; +class Thread; +class ThreadTestHelper; + +// Certain behavior is disallowed on certain threads. ThreadRestrictions helps +// enforce these rules. Examples of such rules: +// +// * Do not do blocking IO (makes the thread janky) +// * Do not access Singleton/LazyInstance (may lead to shutdown crashes) +// +// Here's more about how the protection works: +// +// 1) If a thread should not be allowed to make IO calls, mark it: +// base::ThreadRestrictions::SetIOAllowed(false); +// By default, threads *are* allowed to make IO calls. +// In Chrome browser code, IO calls should be proxied to the File thread. +// +// 2) If a function makes a call that will go out to disk, check whether the +// current thread is allowed: +// base::ThreadRestrictions::AssertIOAllowed(); +// +// +// Style tip: where should you put AssertIOAllowed checks? It's best +// if you put them as close to the disk access as possible, at the +// lowest level. This rule is simple to follow and helps catch all +// callers. For example, if your function GoDoSomeBlockingDiskCall() +// only calls other functions in Chrome and not fopen(), you should go +// add the AssertIOAllowed checks in the helper functions. + +class BASE_EXPORT ThreadRestrictions { + public: + // Constructing a ScopedAllowIO temporarily allows IO for the current + // thread. Doing this is almost certainly always incorrect. + class BASE_EXPORT ScopedAllowIO { + public: + ScopedAllowIO() { previous_value_ = SetIOAllowed(true); } + ~ScopedAllowIO() { SetIOAllowed(previous_value_); } + private: + // Whether IO is allowed when the ScopedAllowIO was constructed. + bool previous_value_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAllowIO); + }; + + // Constructing a ScopedAllowSingleton temporarily allows accessing for the + // current thread. Doing this is almost always incorrect. + class BASE_EXPORT ScopedAllowSingleton { + public: + ScopedAllowSingleton() { previous_value_ = SetSingletonAllowed(true); } + ~ScopedAllowSingleton() { SetSingletonAllowed(previous_value_); } + private: + // Whether singleton use is allowed when the ScopedAllowSingleton was + // constructed. + bool previous_value_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAllowSingleton); + }; + +#if ENABLE_THREAD_RESTRICTIONS + // Set whether the current thread to make IO calls. + // Threads start out in the *allowed* state. + // Returns the previous value. + static bool SetIOAllowed(bool allowed); + + // Check whether the current thread is allowed to make IO calls, + // and DCHECK if not. See the block comment above the class for + // a discussion of where to add these checks. + static void AssertIOAllowed(); + + // Set whether the current thread can use singletons. Returns the previous + // value. + static bool SetSingletonAllowed(bool allowed); + + // Check whether the current thread is allowed to use singletons (Singleton / + // LazyInstance). DCHECKs if not. + static void AssertSingletonAllowed(); + + // Disable waiting on the current thread. Threads start out in the *allowed* + // state. Returns the previous value. + static void DisallowWaiting(); + + // Check whether the current thread is allowed to wait, and DCHECK if not. + static void AssertWaitAllowed(); +#else + // Inline the empty definitions of these functions so that they can be + // compiled out. + static bool SetIOAllowed(bool allowed) { return true; } + static void AssertIOAllowed() {} + static bool SetSingletonAllowed(bool allowed) { return true; } + static void AssertSingletonAllowed() {} + static void DisallowWaiting() {} + static void AssertWaitAllowed() {} +#endif + + private: + // DO NOT ADD ANY OTHER FRIEND STATEMENTS, talk to jam or brettw first. + // BEGIN ALLOWED USAGE. + friend class content::BrowserTestBase; + friend class content::RenderWidgetHelper; + friend class content::ScopedAllowWaitForAndroidLayoutTests; + friend class ::HistogramSynchronizer; + friend class ::ScopedAllowWaitForLegacyWebViewApi; + friend class ::TestingAutomationProvider; + friend class cc::CompletionEvent; + friend class remoting::AutoThread; + friend class MessagePumpDefault; + friend class SequencedWorkerPool; + friend class SimpleThread; + friend class Thread; + friend class ThreadTestHelper; + friend class PlatformThread; + friend class android::JavaHandlerThread; + + // END ALLOWED USAGE. + // BEGIN USAGE THAT NEEDS TO BE FIXED. + friend class ::chromeos::AudioMixerAlsa; // http://crbug.com/125206 + friend class ::chromeos::BlockingMethodCaller; // http://crbug.com/125360 + friend class ::chromeos::system::StatisticsProviderImpl; // http://crbug.com/125385 + friend class browser_sync::NonFrontendDataTypeController; // http://crbug.com/19757 + friend class browser_sync::UIModelWorker; // http://crbug.com/19757 + friend class chrome_browser_net::Predictor; // http://crbug.com/78451 + friend class + content::BrowserGpuChannelHostFactory; // http://crbug.com/125248 + friend class content::GLHelper; // http://crbug.com/125415 + friend class content::GpuChannelHost; // http://crbug.com/125264 + friend class content::TextInputClientMac; // http://crbug.com/121917 + friend class dbus::Bus; // http://crbug.com/125222 + friend class disk_cache::BackendImpl; // http://crbug.com/74623 + friend class disk_cache::InFlightIO; // http://crbug.com/74623 + friend class media::AudioOutputController; // http://crbug.com/120973 + friend class net::FileStreamPosix; // http://crbug.com/115067 + friend class net::FileStreamWin; // http://crbug.com/115067 + friend class net::internal::AddressTrackerLinux; // http://crbug.com/125097 + friend class ::AcceleratedPresenter; // http://crbug.com/125391 + friend class ::BrowserProcessImpl; // http://crbug.com/125207 + friend class ::MetricsService; // http://crbug.com/124954 + friend class ::NativeBackendKWallet; // http://crbug.com/125331 + // END USAGE THAT NEEDS TO BE FIXED. + +#if ENABLE_THREAD_RESTRICTIONS + static bool SetWaitAllowed(bool allowed); +#else + static bool SetWaitAllowed(bool allowed) { return true; } +#endif + + // Constructing a ScopedAllowWait temporarily allows waiting on the current + // thread. Doing this is almost always incorrect, which is why we limit who + // can use this through friend. If you find yourself needing to use this, find + // another way. Talk to jam or brettw. + class BASE_EXPORT ScopedAllowWait { + public: + ScopedAllowWait() { previous_value_ = SetWaitAllowed(true); } + ~ScopedAllowWait() { SetWaitAllowed(previous_value_); } + private: + // Whether singleton use is allowed when the ScopedAllowWait was + // constructed. + bool previous_value_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAllowWait); + }; + + DISALLOW_IMPLICIT_CONSTRUCTORS(ThreadRestrictions); +}; + +} // namespace base + +#endif // BASE_THREADING_THREAD_RESTRICTIONS_H_ diff --git a/chromium/base/threading/thread_unittest.cc b/chromium/base/threading/thread_unittest.cc new file mode 100644 index 00000000000..8bd817c75bc --- /dev/null +++ b/chromium/base/threading/thread_unittest.cc @@ -0,0 +1,239 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/thread.h" + +#include <vector> + +#include "base/bind.h" +#include "base/message_loop/message_loop.h" +#include "base/third_party/dynamic_annotations/dynamic_annotations.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +using base::Thread; + +typedef PlatformTest ThreadTest; + +namespace { + +void ToggleValue(bool* value) { + ANNOTATE_BENIGN_RACE(value, "Test-only data race on boolean " + "in base/thread_unittest"); + *value = !*value; +} + +class SleepInsideInitThread : public Thread { + public: + SleepInsideInitThread() : Thread("none") { + init_called_ = false; + ANNOTATE_BENIGN_RACE( + this, "Benign test-only data race on vptr - http://crbug.com/98219"); + } + virtual ~SleepInsideInitThread() { + Stop(); + } + + virtual void Init() OVERRIDE { + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(500)); + init_called_ = true; + } + bool InitCalled() { return init_called_; } + private: + bool init_called_; +}; + +enum ThreadEvent { + // Thread::Init() was called. + THREAD_EVENT_INIT = 0, + + // The MessageLoop for the thread was deleted. + THREAD_EVENT_MESSAGE_LOOP_DESTROYED, + + // Thread::CleanUp() was called. + THREAD_EVENT_CLEANUP, + + // Keep at end of list. + THREAD_NUM_EVENTS +}; + +typedef std::vector<ThreadEvent> EventList; + +class CaptureToEventList : public Thread { + public: + // This Thread pushes events into the vector |event_list| to show + // the order they occured in. |event_list| must remain valid for the + // lifetime of this thread. + explicit CaptureToEventList(EventList* event_list) + : Thread("none"), + event_list_(event_list) { + } + + virtual ~CaptureToEventList() { + Stop(); + } + + virtual void Init() OVERRIDE { + event_list_->push_back(THREAD_EVENT_INIT); + } + + virtual void CleanUp() OVERRIDE { + event_list_->push_back(THREAD_EVENT_CLEANUP); + } + + private: + EventList* event_list_; +}; + +// Observer that writes a value into |event_list| when a message loop has been +// destroyed. +class CapturingDestructionObserver + : public base::MessageLoop::DestructionObserver { + public: + // |event_list| must remain valid throughout the observer's lifetime. + explicit CapturingDestructionObserver(EventList* event_list) + : event_list_(event_list) { + } + + // DestructionObserver implementation: + virtual void WillDestroyCurrentMessageLoop() OVERRIDE { + event_list_->push_back(THREAD_EVENT_MESSAGE_LOOP_DESTROYED); + event_list_ = NULL; + } + + private: + EventList* event_list_; +}; + +// Task that adds a destruction observer to the current message loop. +void RegisterDestructionObserver( + base::MessageLoop::DestructionObserver* observer) { + base::MessageLoop::current()->AddDestructionObserver(observer); +} + +} // namespace + +TEST_F(ThreadTest, Restart) { + Thread a("Restart"); + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); +} + +TEST_F(ThreadTest, StartWithOptions_StackSize) { + Thread a("StartWithStackSize"); + // Ensure that the thread can work with only 12 kb and still process a + // message. + Thread::Options options; + options.stack_size = 12*1024; + EXPECT_TRUE(a.StartWithOptions(options)); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + + bool was_invoked = false; + a.message_loop()->PostTask(FROM_HERE, base::Bind(&ToggleValue, &was_invoked)); + + // wait for the task to run (we could use a kernel event here + // instead to avoid busy waiting, but this is sufficient for + // testing purposes). + for (int i = 100; i >= 0 && !was_invoked; --i) { + base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10)); + } + EXPECT_TRUE(was_invoked); +} + +TEST_F(ThreadTest, TwoTasks) { + bool was_invoked = false; + { + Thread a("TwoTasks"); + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.message_loop()); + + // Test that all events are dispatched before the Thread object is + // destroyed. We do this by dispatching a sleep event before the + // event that will toggle our sentinel value. + a.message_loop()->PostTask( + FROM_HERE, + base::Bind( + static_cast<void (*)(base::TimeDelta)>( + &base::PlatformThread::Sleep), + base::TimeDelta::FromMilliseconds(20))); + a.message_loop()->PostTask(FROM_HERE, base::Bind(&ToggleValue, + &was_invoked)); + } + EXPECT_TRUE(was_invoked); +} + +TEST_F(ThreadTest, StopSoon) { + Thread a("StopSoon"); + EXPECT_TRUE(a.Start()); + EXPECT_TRUE(a.message_loop()); + EXPECT_TRUE(a.IsRunning()); + a.StopSoon(); + a.StopSoon(); + a.Stop(); + EXPECT_FALSE(a.message_loop()); + EXPECT_FALSE(a.IsRunning()); +} + +TEST_F(ThreadTest, ThreadName) { + Thread a("ThreadName"); + EXPECT_TRUE(a.Start()); + EXPECT_EQ("ThreadName", a.thread_name()); +} + +// Make sure we can't use a thread between Start() and Init(). +TEST_F(ThreadTest, SleepInsideInit) { + SleepInsideInitThread t; + EXPECT_FALSE(t.InitCalled()); + t.Start(); + EXPECT_TRUE(t.InitCalled()); +} + +// Make sure that the destruction sequence is: +// +// (1) Thread::CleanUp() +// (2) MessageLoop::~MessageLoop() +// MessageLoop::DestructionObservers called. +TEST_F(ThreadTest, CleanUp) { + EventList captured_events; + CapturingDestructionObserver loop_destruction_observer(&captured_events); + + { + // Start a thread which writes its event into |captured_events|. + CaptureToEventList t(&captured_events); + EXPECT_TRUE(t.Start()); + EXPECT_TRUE(t.message_loop()); + EXPECT_TRUE(t.IsRunning()); + + // Register an observer that writes into |captured_events| once the + // thread's message loop is destroyed. + t.message_loop()->PostTask( + FROM_HERE, base::Bind(&RegisterDestructionObserver, + base::Unretained(&loop_destruction_observer))); + + // Upon leaving this scope, the thread is deleted. + } + + // Check the order of events during shutdown. + ASSERT_EQ(static_cast<size_t>(THREAD_NUM_EVENTS), captured_events.size()); + EXPECT_EQ(THREAD_EVENT_INIT, captured_events[0]); + EXPECT_EQ(THREAD_EVENT_CLEANUP, captured_events[1]); + EXPECT_EQ(THREAD_EVENT_MESSAGE_LOOP_DESTROYED, captured_events[2]); +} diff --git a/chromium/base/threading/watchdog.cc b/chromium/base/threading/watchdog.cc new file mode 100644 index 00000000000..a18efecc5f6 --- /dev/null +++ b/chromium/base/threading/watchdog.cc @@ -0,0 +1,178 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/watchdog.h" + +#include "base/compiler_specific.h" +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/threading/platform_thread.h" + +namespace base { + +namespace { + +// When the debugger breaks (when we alarm), all the other alarms that are +// armed will expire (also alarm). To diminish this effect, we track any +// delay due to debugger breaks, and we *try* to adjust the effective start +// time of other alarms to step past the debugging break. +// Without this safety net, any alarm will typically trigger a host of follow +// on alarms from callers that specify old times. + +// Lock for access of static data... +LazyInstance<Lock>::Leaky g_static_lock = LAZY_INSTANCE_INITIALIZER; + +// When did we last alarm and get stuck (for a while) in a debugger? +TimeTicks g_last_debugged_alarm_time; + +// How long did we sit on a break in the debugger? +TimeDelta g_last_debugged_alarm_delay; + +} // namespace + +// Start thread running in a Disarmed state. +Watchdog::Watchdog(const TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled) + : enabled_(enabled), + lock_(), + condition_variable_(&lock_), + state_(DISARMED), + duration_(duration), + thread_watched_name_(thread_watched_name), + delegate_(this) { + if (!enabled_) + return; // Don't start thread, or doing anything really. + enabled_ = PlatformThread::Create(0, // Default stack size. + &delegate_, + &handle_); + DCHECK(enabled_); +} + +// Notify watchdog thread, and wait for it to finish up. +Watchdog::~Watchdog() { + if (!enabled_) + return; + if (!IsJoinable()) + Cleanup(); + condition_variable_.Signal(); + PlatformThread::Join(handle_); +} + +void Watchdog::Cleanup() { + if (!enabled_) + return; + { + AutoLock lock(lock_); + state_ = SHUTDOWN; + } + condition_variable_.Signal(); +} + +bool Watchdog::IsJoinable() { + if (!enabled_) + return true; + AutoLock lock(lock_); + return (state_ == JOINABLE); +} + +void Watchdog::Arm() { + ArmAtStartTime(TimeTicks::Now()); +} + +void Watchdog::ArmSomeTimeDeltaAgo(const TimeDelta& time_delta) { + ArmAtStartTime(TimeTicks::Now() - time_delta); +} + +// Start clock for watchdog. +void Watchdog::ArmAtStartTime(const TimeTicks start_time) { + { + AutoLock lock(lock_); + start_time_ = start_time; + state_ = ARMED; + } + // Force watchdog to wake up, and go to sleep with the timer ticking with the + // proper duration. + condition_variable_.Signal(); +} + +// Disable watchdog so that it won't do anything when time expires. +void Watchdog::Disarm() { + AutoLock lock(lock_); + state_ = DISARMED; + // We don't need to signal, as the watchdog will eventually wake up, and it + // will check its state and time, and act accordingly. +} + +void Watchdog::Alarm() { + DVLOG(1) << "Watchdog alarmed for " << thread_watched_name_; +} + +//------------------------------------------------------------------------------ +// Internal private methods that the watchdog thread uses. + +void Watchdog::ThreadDelegate::ThreadMain() { + SetThreadName(); + TimeDelta remaining_duration; + while (1) { + AutoLock lock(watchdog_->lock_); + while (DISARMED == watchdog_->state_) + watchdog_->condition_variable_.Wait(); + if (SHUTDOWN == watchdog_->state_) { + watchdog_->state_ = JOINABLE; + return; + } + DCHECK(ARMED == watchdog_->state_); + remaining_duration = watchdog_->duration_ - + (TimeTicks::Now() - watchdog_->start_time_); + if (remaining_duration.InMilliseconds() > 0) { + // Spurios wake? Timer drifts? Go back to sleep for remaining time. + watchdog_->condition_variable_.TimedWait(remaining_duration); + continue; + } + // We overslept, so this seems like a real alarm. + // Watch out for a user that stopped the debugger on a different alarm! + { + AutoLock static_lock(*g_static_lock.Pointer()); + if (g_last_debugged_alarm_time > watchdog_->start_time_) { + // False alarm: we started our clock before the debugger break (last + // alarm time). + watchdog_->start_time_ += g_last_debugged_alarm_delay; + if (g_last_debugged_alarm_time > watchdog_->start_time_) + // Too many alarms must have taken place. + watchdog_->state_ = DISARMED; + continue; + } + } + watchdog_->state_ = DISARMED; // Only alarm at most once. + TimeTicks last_alarm_time = TimeTicks::Now(); + { + AutoUnlock lock(watchdog_->lock_); + watchdog_->Alarm(); // Set a break point here to debug on alarms. + } + TimeDelta last_alarm_delay = TimeTicks::Now() - last_alarm_time; + if (last_alarm_delay <= TimeDelta::FromMilliseconds(2)) + continue; + // Ignore race of two alarms/breaks going off at roughly the same time. + AutoLock static_lock(*g_static_lock.Pointer()); + // This was a real debugger break. + g_last_debugged_alarm_time = last_alarm_time; + g_last_debugged_alarm_delay = last_alarm_delay; + } +} + +void Watchdog::ThreadDelegate::SetThreadName() const { + std::string name = watchdog_->thread_watched_name_ + " Watchdog"; + PlatformThread::SetName(name.c_str()); + DVLOG(1) << "Watchdog active: " << name; +} + +// static +void Watchdog::ResetStaticData() { + AutoLock lock(*g_static_lock.Pointer()); + g_last_debugged_alarm_time = TimeTicks(); + g_last_debugged_alarm_delay = TimeDelta(); +} + +} // namespace base diff --git a/chromium/base/threading/watchdog.h b/chromium/base/threading/watchdog.h new file mode 100644 index 00000000000..abcb4f12809 --- /dev/null +++ b/chromium/base/threading/watchdog.h @@ -0,0 +1,94 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// The Watchdog class creates a second thread that can Alarm if a specific +// duration of time passes without proper attention. The duration of time is +// specified at construction time. The Watchdog may be used many times by +// simply calling Arm() (to start timing) and Disarm() (to reset the timer). +// The Watchdog is typically used under a debugger, where the stack traces on +// other threads can be examined if/when the Watchdog alarms. + +// Some watchdogs will be enabled or disabled via command line switches. To +// facilitate such code, an "enabled" argument for the constuctor can be used +// to permanently disable the watchdog. Disabled watchdogs don't even spawn +// a second thread, and their methods call (Arm() and Disarm()) return very +// quickly. + +#ifndef BASE_THREADING_WATCHDOG_H_ +#define BASE_THREADING_WATCHDOG_H_ + +#include <string> + +#include "base/base_export.h" +#include "base/compiler_specific.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" +#include "base/time/time.h" + +namespace base { + +class BASE_EXPORT Watchdog { + public: + // Constructor specifies how long the Watchdog will wait before alarming. + Watchdog(const TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled); + virtual ~Watchdog(); + + // Notify watchdog thread to finish up. Sets the state_ to SHUTDOWN. + void Cleanup(); + + // Returns true if we state_ is JOINABLE (which indicates that Watchdog has + // exited). + bool IsJoinable(); + + // Start timing, and alarm when time expires (unless we're disarm()ed.) + void Arm(); // Arm starting now. + void ArmSomeTimeDeltaAgo(const TimeDelta& time_delta); + void ArmAtStartTime(const TimeTicks start_time); + + // Reset time, and do not set off the alarm. + void Disarm(); + + // Alarm is called if the time expires after an Arm() without someone calling + // Disarm(). This method can be overridden to create testable classes. + virtual void Alarm(); + + // Reset static data to initial state. Useful for tests, to ensure + // they are independent. + static void ResetStaticData(); + + private: + class ThreadDelegate : public PlatformThread::Delegate { + public: + explicit ThreadDelegate(Watchdog* watchdog) : watchdog_(watchdog) { + } + virtual void ThreadMain() OVERRIDE; + private: + void SetThreadName() const; + + Watchdog* watchdog_; + }; + + enum State {ARMED, DISARMED, SHUTDOWN, JOINABLE }; + + bool enabled_; + + Lock lock_; // Mutex for state_. + ConditionVariable condition_variable_; + State state_; + const TimeDelta duration_; // How long after start_time_ do we alarm? + const std::string thread_watched_name_; + PlatformThreadHandle handle_; + ThreadDelegate delegate_; // Store it, because it must outlive the thread. + + TimeTicks start_time_; // Start of epoch, and alarm after duration_. + + DISALLOW_COPY_AND_ASSIGN(Watchdog); +}; + +} // namespace base + +#endif // BASE_THREADING_WATCHDOG_H_ diff --git a/chromium/base/threading/watchdog_unittest.cc b/chromium/base/threading/watchdog_unittest.cc new file mode 100644 index 00000000000..7a4be4c764c --- /dev/null +++ b/chromium/base/threading/watchdog_unittest.cc @@ -0,0 +1,142 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/watchdog.h" + +#include "base/logging.h" +#include "base/synchronization/spin_wait.h" +#include "base/threading/platform_thread.h" +#include "base/time/time.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +namespace { + +//------------------------------------------------------------------------------ +// Provide a derived class to facilitate testing. + +class WatchdogCounter : public Watchdog { + public: + WatchdogCounter(const TimeDelta& duration, + const std::string& thread_watched_name, + bool enabled) + : Watchdog(duration, thread_watched_name, enabled), + alarm_counter_(0) { + } + + virtual ~WatchdogCounter() {} + + virtual void Alarm() OVERRIDE { + alarm_counter_++; + Watchdog::Alarm(); + } + + int alarm_counter() { return alarm_counter_; } + + private: + int alarm_counter_; + + DISALLOW_COPY_AND_ASSIGN(WatchdogCounter); +}; + +class WatchdogTest : public testing::Test { + public: + virtual void SetUp() OVERRIDE { + Watchdog::ResetStaticData(); + } +}; + +} // namespace + +//------------------------------------------------------------------------------ +// Actual tests + +// Minimal constructor/destructor test. +TEST_F(WatchdogTest, StartupShutdownTest) { + Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); + Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); +} + +// Test ability to call Arm and Disarm repeatedly. +TEST_F(WatchdogTest, ArmDisarmTest) { + Watchdog watchdog1(TimeDelta::FromMilliseconds(300), "Disabled", false); + watchdog1.Arm(); + watchdog1.Disarm(); + watchdog1.Arm(); + watchdog1.Disarm(); + + Watchdog watchdog2(TimeDelta::FromMilliseconds(300), "Enabled", true); + watchdog2.Arm(); + watchdog2.Disarm(); + watchdog2.Arm(); + watchdog2.Disarm(); +} + +// Make sure a basic alarm fires when the time has expired. +TEST_F(WatchdogTest, AlarmTest) { + WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Enabled", true); + watchdog.Arm(); + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +// Make sure a basic alarm fires when the time has expired. +TEST_F(WatchdogTest, AlarmPriorTimeTest) { + WatchdogCounter watchdog(TimeDelta(), "Enabled2", true); + // Set a time in the past. + watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(2)); + // It should instantly go off, but certainly in less than 5 minutes. + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +// Make sure a disable alarm does nothing, even if we arm it. +TEST_F(WatchdogTest, ConstructorDisabledTest) { + WatchdogCounter watchdog(TimeDelta::FromMilliseconds(10), "Disabled", false); + watchdog.Arm(); + // Alarm should not fire, as it was disabled. + PlatformThread::Sleep(TimeDelta::FromMilliseconds(500)); + EXPECT_EQ(0, watchdog.alarm_counter()); +} + +// Make sure Disarming will prevent firing, even after Arming. +TEST_F(WatchdogTest, DisarmTest) { + WatchdogCounter watchdog(TimeDelta::FromSeconds(1), "Enabled3", true); + + TimeTicks start = TimeTicks::Now(); + watchdog.Arm(); + // Sleep a bit, but not past the alarm point. + PlatformThread::Sleep(TimeDelta::FromMilliseconds(100)); + watchdog.Disarm(); + TimeTicks end = TimeTicks::Now(); + + if (end - start > TimeDelta::FromMilliseconds(500)) { + LOG(WARNING) << "100ms sleep took over 500ms, making the results of this " + << "timing-sensitive test suspicious. Aborting now."; + return; + } + + // Alarm should not have fired before it was disarmed. + EXPECT_EQ(0, watchdog.alarm_counter()); + + // Sleep past the point where it would have fired if it wasn't disarmed, + // and verify that it didn't fire. + PlatformThread::Sleep(TimeDelta::FromSeconds(1)); + EXPECT_EQ(0, watchdog.alarm_counter()); + + // ...but even after disarming, we can still use the alarm... + // Set a time greater than the timeout into the past. + watchdog.ArmSomeTimeDeltaAgo(TimeDelta::FromSeconds(10)); + // It should almost instantly go off, but certainly in less than 5 minutes. + SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(5), + watchdog.alarm_counter() > 0); + + EXPECT_EQ(1, watchdog.alarm_counter()); +} + +} // namespace base diff --git a/chromium/base/threading/worker_pool.cc b/chromium/base/threading/worker_pool.cc new file mode 100644 index 00000000000..9e45f8c89b8 --- /dev/null +++ b/chromium/base/threading/worker_pool.cc @@ -0,0 +1,117 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool.h" + +#include "base/bind.h" +#include "base/compiler_specific.h" +#include "base/lazy_instance.h" +#include "base/task_runner.h" +#include "base/threading/post_task_and_reply_impl.h" +#include "base/tracked_objects.h" + +namespace base { + +namespace { + +class PostTaskAndReplyWorkerPool : public internal::PostTaskAndReplyImpl { + public: + explicit PostTaskAndReplyWorkerPool(bool task_is_slow) + : task_is_slow_(task_is_slow) { + } + + private: + virtual bool PostTask(const tracked_objects::Location& from_here, + const Closure& task) OVERRIDE { + return WorkerPool::PostTask(from_here, task, task_is_slow_); + } + + bool task_is_slow_; +}; + +// WorkerPoolTaskRunner --------------------------------------------- +// A TaskRunner which posts tasks to a WorkerPool with a +// fixed ShutdownBehavior. +// +// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). +class WorkerPoolTaskRunner : public TaskRunner { + public: + explicit WorkerPoolTaskRunner(bool tasks_are_slow); + + // TaskRunner implementation + virtual bool PostDelayedTask(const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) OVERRIDE; + virtual bool RunsTasksOnCurrentThread() const OVERRIDE; + + private: + virtual ~WorkerPoolTaskRunner(); + + // Helper function for posting a delayed task. Asserts that the delay is + // zero because non-zero delays are not supported. + bool PostDelayedTaskAssertZeroDelay( + const tracked_objects::Location& from_here, + const Closure& task, + base::TimeDelta delay); + + const bool tasks_are_slow_; + + DISALLOW_COPY_AND_ASSIGN(WorkerPoolTaskRunner); +}; + +WorkerPoolTaskRunner::WorkerPoolTaskRunner(bool tasks_are_slow) + : tasks_are_slow_(tasks_are_slow) { +} + +WorkerPoolTaskRunner::~WorkerPoolTaskRunner() { +} + +bool WorkerPoolTaskRunner::PostDelayedTask( + const tracked_objects::Location& from_here, + const Closure& task, + TimeDelta delay) { + return PostDelayedTaskAssertZeroDelay(from_here, task, delay); +} + +bool WorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { + return WorkerPool::RunsTasksOnCurrentThread(); +} + +bool WorkerPoolTaskRunner::PostDelayedTaskAssertZeroDelay( + const tracked_objects::Location& from_here, + const Closure& task, + base::TimeDelta delay) { + DCHECK_EQ(delay.InMillisecondsRoundedUp(), 0) + << "WorkerPoolTaskRunner does not support non-zero delays"; + return WorkerPool::PostTask(from_here, task, tasks_are_slow_); +} + +struct TaskRunnerHolder { + TaskRunnerHolder() { + taskrunners_[0] = new WorkerPoolTaskRunner(false); + taskrunners_[1] = new WorkerPoolTaskRunner(true); + } + scoped_refptr<TaskRunner> taskrunners_[2]; +}; + +base::LazyInstance<TaskRunnerHolder>::Leaky + g_taskrunners = LAZY_INSTANCE_INITIALIZER; + +} // namespace + +bool WorkerPool::PostTaskAndReply(const tracked_objects::Location& from_here, + const Closure& task, + const Closure& reply, + bool task_is_slow) { + return PostTaskAndReplyWorkerPool(task_is_slow).PostTaskAndReply( + from_here, task, reply); +} + +// static +const scoped_refptr<TaskRunner>& +WorkerPool::GetTaskRunner(bool tasks_are_slow) { + return g_taskrunners.Get().taskrunners_[tasks_are_slow]; +} + +} // namespace base diff --git a/chromium/base/threading/worker_pool.h b/chromium/base/threading/worker_pool.h new file mode 100644 index 00000000000..333b4950f6b --- /dev/null +++ b/chromium/base/threading/worker_pool.h @@ -0,0 +1,60 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_THREADING_WORKER_POOL_H_ +#define BASE_THREADING_WORKER_POOL_H_ + +#include "base/base_export.h" +#include "base/callback_forward.h" +#include "base/memory/ref_counted.h" + +class Task; + +namespace tracked_objects { +class Location; +} // namespace tracked_objects + +namespace base { + +class TaskRunner; + +// This is a facility that runs tasks that don't require a specific thread or +// a message loop. +// +// WARNING: This shouldn't be used unless absolutely necessary. We don't wait +// for the worker pool threads to finish on shutdown, so the tasks running +// inside the pool must be extremely careful about other objects they access +// (MessageLoops, Singletons, etc). During shutdown these object may no longer +// exist. +class BASE_EXPORT WorkerPool { + public: + // This function posts |task| to run on a worker thread. |task_is_slow| + // should be used for tasks that will take a long time to execute. Returns + // false if |task| could not be posted to a worker thread. Regardless of + // return value, ownership of |task| is transferred to the worker pool. + static bool PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow); + + // Just like MessageLoopProxy::PostTaskAndReply, except the destination + // for |task| is a worker thread and you can specify |task_is_slow| just + // like you can for PostTask above. + static bool PostTaskAndReply(const tracked_objects::Location& from_here, + const Closure& task, + const Closure& reply, + bool task_is_slow); + + // Return true if the current thread is one that this WorkerPool runs tasks + // on. (Note that if the Windows worker pool is used without going through + // this WorkerPool interface, RunsTasksOnCurrentThread would return false on + // those threads.) + static bool RunsTasksOnCurrentThread(); + + // Get a TaskRunner wrapper which posts to the WorkerPool using the given + // |task_is_slow| behavior. + static const scoped_refptr<TaskRunner>& GetTaskRunner(bool task_is_slow); +}; + +} // namespace base + +#endif // BASE_THREADING_WORKER_POOL_H_ diff --git a/chromium/base/threading/worker_pool_posix.cc b/chromium/base/threading/worker_pool_posix.cc new file mode 100644 index 00000000000..c4c523f270d --- /dev/null +++ b/chromium/base/threading/worker_pool_posix.cc @@ -0,0 +1,202 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool_posix.h" + +#include "base/bind.h" +#include "base/callback.h" +#include "base/debug/trace_event.h" +#include "base/lazy_instance.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "base/strings/stringprintf.h" +#include "base/threading/platform_thread.h" +#include "base/threading/thread_local.h" +#include "base/threading/worker_pool.h" +#include "base/tracked_objects.h" + +using tracked_objects::TrackedTime; + +namespace base { + +namespace { + +base::LazyInstance<ThreadLocalBoolean>::Leaky + g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; + +const int kIdleSecondsBeforeExit = 10 * 60; + +#ifdef ADDRESS_SANITIZER +const int kWorkerThreadStackSize = 256 * 1024; +#else +// A stack size of 64 KB is too small for the CERT_PKIXVerifyCert +// function of NSS because of NSS bug 439169. +const int kWorkerThreadStackSize = 128 * 1024; +#endif + +class WorkerPoolImpl { + public: + WorkerPoolImpl(); + ~WorkerPoolImpl(); + + void PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow); + + private: + scoped_refptr<base::PosixDynamicThreadPool> pool_; +}; + +WorkerPoolImpl::WorkerPoolImpl() + : pool_(new base::PosixDynamicThreadPool("WorkerPool", + kIdleSecondsBeforeExit)) { +} + +WorkerPoolImpl::~WorkerPoolImpl() { + pool_->Terminate(); +} + +void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + pool_->PostTask(from_here, task); +} + +base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = + LAZY_INSTANCE_INITIALIZER; + +class WorkerThread : public PlatformThread::Delegate { + public: + WorkerThread(const std::string& name_prefix, + base::PosixDynamicThreadPool* pool) + : name_prefix_(name_prefix), + pool_(pool) {} + + virtual void ThreadMain() OVERRIDE; + + private: + const std::string name_prefix_; + scoped_refptr<base::PosixDynamicThreadPool> pool_; + + DISALLOW_COPY_AND_ASSIGN(WorkerThread); +}; + +void WorkerThread::ThreadMain() { + g_worker_pool_running_on_this_thread.Get().Set(true); + const std::string name = base::StringPrintf( + "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); + // Note |name.c_str()| must remain valid for for the whole life of the thread. + PlatformThread::SetName(name.c_str()); + + for (;;) { + PendingTask pending_task = pool_->WaitForTask(); + if (pending_task.task.is_null()) + break; + TRACE_EVENT2("task", "WorkerThread::ThreadMain::Run", + "src_file", pending_task.posted_from.file_name(), + "src_func", pending_task.posted_from.function_name()); + + TrackedTime start_time = + tracked_objects::ThreadData::NowForStartOfRun(pending_task.birth_tally); + + pending_task.task.Run(); + + tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( + pending_task.birth_tally, TrackedTime(pending_task.time_posted), + start_time, tracked_objects::ThreadData::NowForEndOfRun()); + } + + // The WorkerThread is non-joinable, so it deletes itself. + delete this; +} + +} // namespace + +// static +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); + return true; +} + +// static +bool WorkerPool::RunsTasksOnCurrentThread() { + return g_worker_pool_running_on_this_thread.Get().Get(); +} + +PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, + int idle_seconds_before_exit) + : name_prefix_(name_prefix), + idle_seconds_before_exit_(idle_seconds_before_exit), + pending_tasks_available_cv_(&lock_), + num_idle_threads_(0), + terminated_(false) {} + +PosixDynamicThreadPool::~PosixDynamicThreadPool() { + while (!pending_tasks_.empty()) + pending_tasks_.pop(); +} + +void PosixDynamicThreadPool::Terminate() { + { + AutoLock locked(lock_); + DCHECK(!terminated_) << "Thread pool is already terminated."; + terminated_ = true; + } + pending_tasks_available_cv_.Broadcast(); +} + +void PosixDynamicThreadPool::PostTask( + const tracked_objects::Location& from_here, + const base::Closure& task) { + PendingTask pending_task(from_here, task); + AddTask(&pending_task); +} + +void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { + AutoLock locked(lock_); + DCHECK(!terminated_) << + "This thread pool is already terminated. Do not post new tasks."; + + pending_tasks_.push(*pending_task); + pending_task->task.Reset(); + + // We have enough worker threads. + if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { + pending_tasks_available_cv_.Signal(); + } else { + // The new PlatformThread will take ownership of the WorkerThread object, + // which will delete itself on exit. + WorkerThread* worker = + new WorkerThread(name_prefix_, this); + PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); + } +} + +PendingTask PosixDynamicThreadPool::WaitForTask() { + AutoLock locked(lock_); + + if (terminated_) + return PendingTask(FROM_HERE, base::Closure()); + + if (pending_tasks_.empty()) { // No work available, wait for work. + num_idle_threads_++; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + pending_tasks_available_cv_.TimedWait( + TimeDelta::FromSeconds(idle_seconds_before_exit_)); + num_idle_threads_--; + if (num_idle_threads_cv_.get()) + num_idle_threads_cv_->Signal(); + if (pending_tasks_.empty()) { + // We waited for work, but there's still no work. Return NULL to signal + // the thread to terminate. + return PendingTask(FROM_HERE, base::Closure()); + } + } + + PendingTask pending_task = pending_tasks_.front(); + pending_tasks_.pop(); + return pending_task; +} + +} // namespace base diff --git a/chromium/base/threading/worker_pool_posix.h b/chromium/base/threading/worker_pool_posix.h new file mode 100644 index 00000000000..dd0ffb656fa --- /dev/null +++ b/chromium/base/threading/worker_pool_posix.h @@ -0,0 +1,98 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// The thread pool used in the POSIX implementation of WorkerPool dynamically +// adds threads as necessary to handle all tasks. It keeps old threads around +// for a period of time to allow them to be reused. After this waiting period, +// the threads exit. This thread pool uses non-joinable threads, therefore +// worker threads are not joined during process shutdown. This means that +// potentially long running tasks (such as DNS lookup) do not block process +// shutdown, but also means that process shutdown may "leak" objects. Note that +// although PosixDynamicThreadPool spawns the worker threads and manages the +// task queue, it does not own the worker threads. The worker threads ask the +// PosixDynamicThreadPool for work and eventually clean themselves up. The +// worker threads all maintain scoped_refptrs to the PosixDynamicThreadPool +// instance, which prevents PosixDynamicThreadPool from disappearing before all +// worker threads exit. The owner of PosixDynamicThreadPool should likewise +// maintain a scoped_refptr to the PosixDynamicThreadPool instance. +// +// NOTE: The classes defined in this file are only meant for use by the POSIX +// implementation of WorkerPool. No one else should be using these classes. +// These symbols are exported in a header purely for testing purposes. + +#ifndef BASE_THREADING_WORKER_POOL_POSIX_H_ +#define BASE_THREADING_WORKER_POOL_POSIX_H_ + +#include <queue> +#include <string> + +#include "base/basictypes.h" +#include "base/callback_forward.h" +#include "base/location.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/pending_task.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" +#include "base/tracked_objects.h" + +class Task; + +namespace base { + +class BASE_EXPORT PosixDynamicThreadPool + : public RefCountedThreadSafe<PosixDynamicThreadPool> { + public: + class PosixDynamicThreadPoolPeer; + + // All worker threads will share the same |name_prefix|. They will exit after + // |idle_seconds_before_exit|. + PosixDynamicThreadPool(const std::string& name_prefix, + int idle_seconds_before_exit); + + // Indicates that the thread pool is going away. Stops handing out tasks to + // worker threads. Wakes up all the idle threads to let them exit. + void Terminate(); + + // Adds |task| to the thread pool. + void PostTask(const tracked_objects::Location& from_here, + const Closure& task); + + // Worker thread method to wait for up to |idle_seconds_before_exit| for more + // work from the thread pool. Returns NULL if no work is available. + PendingTask WaitForTask(); + + private: + friend class RefCountedThreadSafe<PosixDynamicThreadPool>; + friend class PosixDynamicThreadPoolPeer; + + ~PosixDynamicThreadPool(); + + // Adds pending_task to the thread pool. This function will clear + // |pending_task->task|. + void AddTask(PendingTask* pending_task); + + const std::string name_prefix_; + const int idle_seconds_before_exit_; + + Lock lock_; // Protects all the variables below. + + // Signal()s worker threads to let them know more tasks are available. + // Also used for Broadcast()'ing to worker threads to let them know the pool + // is being deleted and they can exit. + ConditionVariable pending_tasks_available_cv_; + int num_idle_threads_; + TaskQueue pending_tasks_; + bool terminated_; + // Only used for tests to ensure correct thread ordering. It will always be + // NULL in non-test code. + scoped_ptr<ConditionVariable> num_idle_threads_cv_; + + DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPool); +}; + +} // namespace base + +#endif // BASE_THREADING_WORKER_POOL_POSIX_H_ diff --git a/chromium/base/threading/worker_pool_posix_unittest.cc b/chromium/base/threading/worker_pool_posix_unittest.cc new file mode 100644 index 00000000000..49f6570aa4e --- /dev/null +++ b/chromium/base/threading/worker_pool_posix_unittest.cc @@ -0,0 +1,254 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool_posix.h" + +#include <set> + +#include "base/bind.h" +#include "base/callback.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" +#include "base/threading/platform_thread.h" +#include "base/synchronization/waitable_event.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +// Peer class to provide passthrough access to PosixDynamicThreadPool internals. +class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer { + public: + explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool) + : pool_(pool) {} + + Lock* lock() { return &pool_->lock_; } + ConditionVariable* pending_tasks_available_cv() { + return &pool_->pending_tasks_available_cv_; + } + const std::queue<PendingTask>& pending_tasks() const { + return pool_->pending_tasks_; + } + int num_idle_threads() const { return pool_->num_idle_threads_; } + ConditionVariable* num_idle_threads_cv() { + return pool_->num_idle_threads_cv_.get(); + } + void set_num_idle_threads_cv(ConditionVariable* cv) { + pool_->num_idle_threads_cv_.reset(cv); + } + + private: + PosixDynamicThreadPool* pool_; + + DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer); +}; + +namespace { + +// IncrementingTask's main purpose is to increment a counter. It also updates a +// set of unique thread ids, and signals a ConditionVariable on completion. +// Note that since it does not block, there is no way to control the number of +// threads used if more than one IncrementingTask is consecutively posted to the +// thread pool, since the first one might finish executing before the subsequent +// PostTask() calls get invoked. +void IncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set<PlatformThreadId>* unique_threads) { + { + base::AutoLock locked(*unique_threads_lock); + unique_threads->insert(PlatformThread::CurrentId()); + } + base::AutoLock locked(*counter_lock); + (*counter)++; +} + +// BlockingIncrementingTask is a simple wrapper around IncrementingTask that +// allows for waiting at the start of Run() for a WaitableEvent to be signalled. +struct BlockingIncrementingTaskArgs { + Lock* counter_lock; + int* counter; + Lock* unique_threads_lock; + std::set<PlatformThreadId>* unique_threads; + Lock* num_waiting_to_start_lock; + int* num_waiting_to_start; + ConditionVariable* num_waiting_to_start_cv; + base::WaitableEvent* start; +}; + +void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) { + { + base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock); + (*args.num_waiting_to_start)++; + } + args.num_waiting_to_start_cv->Signal(); + args.start->Wait(); + IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock, + args.unique_threads); +} + +class PosixDynamicThreadPoolTest : public testing::Test { + protected: + PosixDynamicThreadPoolTest() + : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)), + peer_(pool_.get()), + counter_(0), + num_waiting_to_start_(0), + num_waiting_to_start_cv_(&num_waiting_to_start_lock_), + start_(true, false) {} + + virtual void SetUp() OVERRIDE { + peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock())); + } + + virtual void TearDown() OVERRIDE { + // Wake up the idle threads so they can terminate. + if (pool_.get()) pool_->Terminate(); + } + + void WaitForTasksToStart(int num_tasks) { + base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_); + while (num_waiting_to_start_ < num_tasks) { + num_waiting_to_start_cv_.Wait(); + } + } + + void WaitForIdleThreads(int num_idle_threads) { + base::AutoLock pool_locked(*peer_.lock()); + while (peer_.num_idle_threads() < num_idle_threads) { + peer_.num_idle_threads_cv()->Wait(); + } + } + + base::Closure CreateNewIncrementingTaskCallback() { + return base::Bind(&IncrementingTask, &counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); + } + + base::Closure CreateNewBlockingIncrementingTaskCallback() { + BlockingIncrementingTaskArgs args = { + &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, + &num_waiting_to_start_lock_, &num_waiting_to_start_, + &num_waiting_to_start_cv_, &start_ + }; + return base::Bind(&BlockingIncrementingTask, args); + } + + scoped_refptr<base::PosixDynamicThreadPool> pool_; + base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_; + Lock counter_lock_; + int counter_; + Lock unique_threads_lock_; + std::set<PlatformThreadId> unique_threads_; + Lock num_waiting_to_start_lock_; + int num_waiting_to_start_; + ConditionVariable num_waiting_to_start_cv_; + base::WaitableEvent start_; +}; + +} // namespace + +TEST_F(PosixDynamicThreadPoolTest, Basic) { + EXPECT_EQ(0, peer_.num_idle_threads()); + EXPECT_EQ(0U, unique_threads_.size()); + EXPECT_EQ(0U, peer_.pending_tasks().size()); + + // Add one task and wait for it to be completed. + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + + WaitForIdleThreads(1); + + EXPECT_EQ(1U, unique_threads_.size()) << + "There should be only one thread allocated for one task."; + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(1, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { + // Add one task and wait for it to be completed. + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + + WaitForIdleThreads(1); + + // Add another 2 tasks. One should reuse the existing worker thread. + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(3, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { + // Add two blocking tasks. + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + + EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(2U, unique_threads_.size()); + EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle."; + EXPECT_EQ(2, counter_); +} + +TEST_F(PosixDynamicThreadPoolTest, Complex) { + // Add two non blocking tasks and wait for them to finish. + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + + WaitForIdleThreads(1); + + // Add two blocking tasks, start them simultaneously, and wait for them to + // finish. + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + + WaitForTasksToStart(2); + start_.Signal(); + WaitForIdleThreads(2); + + EXPECT_EQ(3, counter_); + EXPECT_EQ(2, peer_.num_idle_threads()); + EXPECT_EQ(2U, unique_threads_.size()); + + // Wake up all idle threads so they can exit. + { + base::AutoLock locked(*peer_.lock()); + while (peer_.num_idle_threads() > 0) { + peer_.pending_tasks_available_cv()->Signal(); + peer_.num_idle_threads_cv()->Wait(); + } + } + + // Add another non blocking task. There are no threads to reuse. + pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + WaitForIdleThreads(1); + + // The POSIX implementation of PlatformThread::CurrentId() uses pthread_self() + // which is not guaranteed to be unique after a thread joins. The OS X + // implemntation of pthread_self() returns the address of the pthread_t, which + // is merely a malloc()ed pointer stored in the first TLS slot. When a thread + // joins and that structure is freed, the block of memory can be put on the + // OS free list, meaning the same address could be reused in a subsequent + // allocation. This in fact happens when allocating in a loop as this test + // does. + // + // Because there are two concurrent threads, there's at least the guarantee + // of having two unique thread IDs in the set. But after those two threads are + // joined, the next-created thread can get a re-used ID if the allocation of + // the pthread_t structure is taken from the free list. Therefore, there can + // be either 2 or 3 unique thread IDs in the set at this stage in the test. + EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3) + << "unique_threads_.size() = " << unique_threads_.size(); + EXPECT_EQ(1, peer_.num_idle_threads()); + EXPECT_EQ(4, counter_); +} + +} // namespace base diff --git a/chromium/base/threading/worker_pool_unittest.cc b/chromium/base/threading/worker_pool_unittest.cc new file mode 100644 index 00000000000..9a9ab951b9e --- /dev/null +++ b/chromium/base/threading/worker_pool_unittest.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool.h" + +#include "base/bind.h" +#include "base/bind_helpers.h" +#include "base/location.h" +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" +#include "base/synchronization/waitable_event.h" +#include "base/test/test_timeouts.h" +#include "base/threading/thread_checker_impl.h" +#include "base/time/time.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/platform_test.h" + +typedef PlatformTest WorkerPoolTest; + +namespace base { + +namespace { + +class PostTaskAndReplyTester + : public base::RefCountedThreadSafe<PostTaskAndReplyTester> { + public: + PostTaskAndReplyTester() : finished_(false), test_event_(false, false) {} + + void RunTest() { + ASSERT_TRUE(thread_checker_.CalledOnValidThread()); + WorkerPool::PostTaskAndReply( + FROM_HERE, + base::Bind(&PostTaskAndReplyTester::OnWorkerThread, this), + base::Bind(&PostTaskAndReplyTester::OnOriginalThread, this), + false); + + test_event_.Wait(); + } + + void OnWorkerThread() { + // We're not on the original thread. + EXPECT_FALSE(thread_checker_.CalledOnValidThread()); + + test_event_.Signal(); + } + + void OnOriginalThread() { + EXPECT_TRUE(thread_checker_.CalledOnValidThread()); + finished_ = true; + } + + bool finished() const { + return finished_; + } + + private: + friend class base::RefCountedThreadSafe<PostTaskAndReplyTester>; + ~PostTaskAndReplyTester() {} + + bool finished_; + WaitableEvent test_event_; + + // The Impl version performs its checks even in release builds. + ThreadCheckerImpl thread_checker_; +}; + +} // namespace + +TEST_F(WorkerPoolTest, PostTask) { + WaitableEvent test_event(false, false); + WaitableEvent long_test_event(false, false); + + WorkerPool::PostTask(FROM_HERE, + base::Bind(&WaitableEvent::Signal, + base::Unretained(&test_event)), + false); + WorkerPool::PostTask(FROM_HERE, + base::Bind(&WaitableEvent::Signal, + base::Unretained(&long_test_event)), + true); + + test_event.Wait(); + long_test_event.Wait(); +} + +#if defined(OS_WIN) || defined(OS_LINUX) +// Flaky on Windows and Linux (http://crbug.com/130337) +#define MAYBE_PostTaskAndReply DISABLED_PostTaskAndReply +#else +#define MAYBE_PostTaskAndReply PostTaskAndReply +#endif + +TEST_F(WorkerPoolTest, MAYBE_PostTaskAndReply) { + MessageLoop message_loop; + scoped_refptr<PostTaskAndReplyTester> tester(new PostTaskAndReplyTester()); + tester->RunTest(); + + const TimeDelta kMaxDuration = TestTimeouts::tiny_timeout(); + TimeTicks start = TimeTicks::Now(); + while (!tester->finished() && TimeTicks::Now() - start < kMaxDuration) { +#if defined(OS_IOS) + // Ensure that the other thread has a chance to run even on a single-core + // device. + pthread_yield_np(); +#endif + RunLoop().RunUntilIdle(); + } + EXPECT_TRUE(tester->finished()); +} + +} // namespace base diff --git a/chromium/base/threading/worker_pool_win.cc b/chromium/base/threading/worker_pool_win.cc new file mode 100644 index 00000000000..c27010fb12f --- /dev/null +++ b/chromium/base/threading/worker_pool_win.cc @@ -0,0 +1,73 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/threading/worker_pool.h" + +#include "base/bind.h" +#include "base/callback.h" +#include "base/debug/trace_event.h" +#include "base/logging.h" +#include "base/pending_task.h" +#include "base/threading/thread_local.h" +#include "base/tracked_objects.h" + +namespace base { + +namespace { + +base::LazyInstance<ThreadLocalBoolean>::Leaky + g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; + +DWORD CALLBACK WorkItemCallback(void* param) { + PendingTask* pending_task = static_cast<PendingTask*>(param); + TRACE_EVENT2("task", "WorkItemCallback::Run", + "src_file", pending_task->posted_from.file_name(), + "src_func", pending_task->posted_from.function_name()); + + tracked_objects::TrackedTime start_time = + tracked_objects::ThreadData::NowForStartOfRun(pending_task->birth_tally); + + g_worker_pool_running_on_this_thread.Get().Set(true); + pending_task->task.Run(); + g_worker_pool_running_on_this_thread.Get().Set(false); + + tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( + pending_task->birth_tally, + tracked_objects::TrackedTime(pending_task->time_posted), start_time, + tracked_objects::ThreadData::NowForEndOfRun()); + + delete pending_task; + return 0; +} + +// Takes ownership of |pending_task| +bool PostTaskInternal(PendingTask* pending_task, bool task_is_slow) { + ULONG flags = 0; + if (task_is_slow) + flags |= WT_EXECUTELONGFUNCTION; + + if (!QueueUserWorkItem(WorkItemCallback, pending_task, flags)) { + DLOG_GETLASTERROR(ERROR) << "QueueUserWorkItem failed"; + delete pending_task; + return false; + } + + return true; +} + +} // namespace + +// static +bool WorkerPool::PostTask(const tracked_objects::Location& from_here, + const base::Closure& task, bool task_is_slow) { + PendingTask* pending_task = new PendingTask(from_here, task); + return PostTaskInternal(pending_task, task_is_slow); +} + +// static +bool WorkerPool::RunsTasksOnCurrentThread() { + return g_worker_pool_running_on_this_thread.Get().Get(); +} + +} // namespace base |