diff options
-rwxr-xr-x | appveyor.yml | 2 | ||||
-rw-r--r-- | build/docker/centos-7.3/Dockerfile | 6 | ||||
-rw-r--r-- | build/docker/debian-stretch/Dockerfile | 1 | ||||
-rw-r--r-- | build/docker/ubuntu-xenial/Dockerfile | 1 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp | 46 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/BoostThreadFactory.h | 2 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp | 35 | ||||
-rw-r--r-- | lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp | 50 | ||||
-rw-r--r-- | lib/cpp/src/thrift/transport/TServerSocket.cpp | 11 | ||||
-rw-r--r-- | lib/cpp/src/thrift/transport/TServerSocket.h | 2 | ||||
-rw-r--r-- | lib/cpp/test/concurrency/Tests.cpp | 25 | ||||
-rw-r--r-- | lib/cpp/test/concurrency/ThreadFactoryTests.h | 24 | ||||
-rw-r--r-- | lib/cpp/test/concurrency/ThreadManagerTests.h | 2 | ||||
-rw-r--r-- | test/valgrind.suppress | 44 |
14 files changed, 180 insertions, 71 deletions
diff --git a/appveyor.yml b/appveyor.yml index fc09f87f9..4c2e36496 100755 --- a/appveyor.yml +++ b/appveyor.yml @@ -40,7 +40,7 @@ environment: LIBEVENT_VERSION: 2.0.22 QT_VERSION: 5.6 ZLIB_VERSION: 1.2.8 - DISABLED_TESTS: StressTestNonBlocking|concurrency_test + DISABLED_TESTS: StressTestNonBlocking - PROFILE: MSVC2015 PLATFORM: x64 diff --git a/build/docker/centos-7.3/Dockerfile b/build/docker/centos-7.3/Dockerfile index f79939c72..096bbaa45 100644 --- a/build/docker/centos-7.3/Dockerfile +++ b/build/docker/centos-7.3/Dockerfile @@ -10,11 +10,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Apache Thrift Docker build environment for Centos +# Apache Thrift Docker build environment for CentOS # # Known missing client libraries: # - dotnet (will update to 2.0.0 separately) -# - haxe (not in debian stretch) +# - haxe (not in centos) FROM centos:7.3.1611 MAINTAINER Apache Thrift <dev@thrift.apache.org> @@ -33,12 +33,14 @@ RUN yum install -y \ flex \ gcc \ gcc-c++ \ + gdb \ git \ libtool \ m4 \ make \ tar \ unzip \ + valgrind \ wget && \ ln -s /usr/bin/cmake3 /usr/bin/cmake && \ ln -s /usr/bin/cpack3 /usr/bin/cpack && \ diff --git a/build/docker/debian-stretch/Dockerfile b/build/docker/debian-stretch/Dockerfile index 70309fbe0..503eecd42 100644 --- a/build/docker/debian-stretch/Dockerfile +++ b/build/docker/debian-stretch/Dockerfile @@ -56,6 +56,7 @@ RUN apt-get install -y --no-install-recommends \ gdb \ ninja-build \ pkg-config \ + valgrind \ vim diff --git a/build/docker/ubuntu-xenial/Dockerfile b/build/docker/ubuntu-xenial/Dockerfile index 6bad6a94c..6324ec238 100644 --- a/build/docker/ubuntu-xenial/Dockerfile +++ b/build/docker/ubuntu-xenial/Dockerfile @@ -60,6 +60,7 @@ RUN apt-get install -y --no-install-recommends \ llvm \ ninja-build \ pkg-config \ + valgrind \ vim ENV PATH /usr/lib/llvm-3.8/bin:$PATH diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp index 3661275e8..d7d8d54e9 100644 --- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp +++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp @@ -51,6 +51,7 @@ public: private: scoped_ptr<boost::thread> thread_; + Monitor monitor_; STATE state_; weak_ptr<BoostThread> self_; bool detached_; @@ -71,25 +72,46 @@ public: } } - void start() { - if (state_ != uninitialized) { - return; + STATE getState() const + { + Synchronized sync(monitor_); + return state_; + } + + void setState(STATE newState) + { + Synchronized sync(monitor_); + state_ = newState; + + // unblock start() with the knowledge that the thread has actually + // started running, which avoids a race in detached threads. + if (newState == started) { + monitor_.notify(); } + } + void start() { // Create reference shared_ptr<BoostThread>* selfRef = new shared_ptr<BoostThread>(); *selfRef = self_.lock(); - state_ = starting; + setState(starting); + Synchronized sync(monitor_); + thread_.reset(new boost::thread(bind(threadMain, (void*)selfRef))); if (detached_) thread_->detach(); + + // Wait for the thread to start and get far enough to grab everything + // that it needs from the calling context, thus absolving the caller + // from being required to hold on to runnable indefinitely. + monitor_.wait(); } void join() { - if (!detached_ && state_ != uninitialized) { + if (!detached_ && getState() != uninitialized) { thread_->join(); } } @@ -110,19 +132,11 @@ void* BoostThread::threadMain(void* arg) { shared_ptr<BoostThread> thread = *(shared_ptr<BoostThread>*)arg; delete reinterpret_cast<shared_ptr<BoostThread>*>(arg); - if (!thread) { - return (void*)0; - } - - if (thread->state_ != starting) { - return (void*)0; - } - - thread->state_ = started; + thread->setState(started); thread->runnable()->run(); - if (thread->state_ != stopping && thread->state_ != stopped) { - thread->state_ = stopping; + if (thread->getState() != stopping && thread->getState() != stopped) { + thread->setState(stopping); } return (void*)0; } diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h index 4c7a45abb..bf11a708b 100644 --- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h +++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h @@ -20,8 +20,8 @@ #ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ #define _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ 1 +#include <thrift/concurrency/Monitor.h> #include <thrift/concurrency/Thread.h> - #include <thrift/stdcxx.h> namespace apache { diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp index d829d6905..2e35446b5 100644 --- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp @@ -20,7 +20,7 @@ #include <thrift/thrift-config.h> #include <thrift/concurrency/Exception.h> -#include <thrift/concurrency/Mutex.h> +#include <thrift/concurrency/Monitor.h> #include <thrift/concurrency/PosixThreadFactory.h> #if GOOGLE_PERFTOOLS_REGISTER_THREAD @@ -53,8 +53,8 @@ public: private: pthread_t pthread_; - Mutex state_mutex_; - STATE state_; + Monitor monitor_; // guard to protect state_ and also notification + STATE state_; // to protect proper thread start behavior int policy_; int priority_; int stackSize_; @@ -96,14 +96,20 @@ public: STATE getState() const { - Guard g(state_mutex_); + Synchronized sync(monitor_); return state_; } void setState(STATE newState) { - Guard g(state_mutex_); + Synchronized sync(monitor_); state_ = newState; + + // unblock start() with the knowledge that the thread has actually + // started running, which avoids a race in detached threads. + if (newState == started) { + monitor_.notify(); + } } void start() { @@ -154,9 +160,18 @@ public: setState(starting); + Synchronized sync(monitor_); + if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) { throw SystemResourceException("pthread_create failed"); } + + // The caller may not choose to guarantee the scope of the Runnable + // being used in the thread, so we must actually wait until the thread + // starts before we return. If we do not wait, it would be possible + // for the caller to start destructing the Runnable and the Thread, + // and we would end up in a race. This was identified with valgrind. + monitor_.wait(); } void join() { @@ -174,8 +189,6 @@ public: if (res != 0) { GlobalOutput.printf("PthreadThread::join(): fail with code %d", res); } - } else { - GlobalOutput.printf("PthreadThread::join(): detached thread"); } } @@ -202,14 +215,6 @@ void* PthreadThread::threadMain(void* arg) { stdcxx::shared_ptr<PthreadThread> thread = *(stdcxx::shared_ptr<PthreadThread>*)arg; delete reinterpret_cast<stdcxx::shared_ptr<PthreadThread>*>(arg); - if (thread == NULL) { - return (void*)0; - } - - if (thread->getState() != starting) { - return (void*)0; - } - #if GOOGLE_PERFTOOLS_REGISTER_THREAD ProfilerRegisterThread(); #endif diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp index 4067f2418..da0c5e373 100644 --- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp +++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp @@ -21,8 +21,9 @@ #if USE_STD_THREAD -#include <thrift/concurrency/StdThreadFactory.h> #include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/StdThreadFactory.h> #include <thrift/stdcxx.h> #include <cassert> @@ -49,6 +50,7 @@ public: private: std::unique_ptr<std::thread> thread_; + Monitor monitor_; STATE state_; bool detached_; @@ -68,18 +70,42 @@ public: } } + STATE getState() const + { + Synchronized sync(monitor_); + return state_; + } + + void setState(STATE newState) + { + Synchronized sync(monitor_); + state_ = newState; + + // unblock start() with the knowledge that the thread has actually + // started running, which avoids a race in detached threads. + if (newState == started) { + monitor_.notify(); + } + } + void start() { - if (state_ != uninitialized) { + if (getState() != uninitialized) { return; } stdcxx::shared_ptr<StdThread> selfRef = shared_from_this(); - state_ = starting; + setState(starting); + Synchronized sync(monitor_); thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef)); if (detached_) thread_->detach(); + + // Wait for the thread to start and get far enough to grab everything + // that it needs from the calling context, thus absolving the caller + // from being required to hold on to runnable indefinitely. + monitor_.wait(); } void join() { @@ -96,22 +122,16 @@ public: }; void StdThread::threadMain(stdcxx::shared_ptr<StdThread> thread) { - if (thread == NULL) { - return; - } +#if GOOGLE_PERFTOOLS_REGISTER_THREAD + ProfilerRegisterThread(); +#endif - if (thread->state_ != starting) { - return; - } - - thread->state_ = started; + thread->setState(started); thread->runnable()->run(); - if (thread->state_ != stopping && thread->state_ != stopped) { - thread->state_ = stopping; + if (thread->getState() != stopping && thread->getState() != stopped) { + thread->setState(stopping); } - - return; } StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) { diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp index a704b068c..3179b1aec 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.cpp +++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp @@ -658,14 +658,21 @@ void TServerSocket::notify(THRIFT_SOCKET notifySocket) { } void TServerSocket::interrupt() { - notify(interruptSockWriter_); + concurrency::Guard g(rwMutex_); + if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) { + notify(interruptSockWriter_); + } } void TServerSocket::interruptChildren() { - notify(childInterruptSockWriter_); + concurrency::Guard g(rwMutex_); + if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) { + notify(childInterruptSockWriter_); + } } void TServerSocket::close() { + concurrency::Guard g(rwMutex_); if (serverSocket_ != THRIFT_INVALID_SOCKET) { shutdown(serverSocket_, THRIFT_SHUT_RDWR); ::THRIFT_CLOSESOCKET(serverSocket_); diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h index cb11dc49a..1daaa8299 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.h +++ b/lib/cpp/src/thrift/transport/TServerSocket.h @@ -20,6 +20,7 @@ #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_ #define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1 +#include <thrift/concurrency/Mutex.h> #include <thrift/stdcxx.h> #include <thrift/transport/PlatformSocket.h> #include <thrift/transport/TServerTransport.h> @@ -169,6 +170,7 @@ private: bool keepAlive_; bool listening_; + concurrency::Mutex rwMutex_; // thread-safe interrupt THRIFT_SOCKET interruptSockWriter_; // is notified on interrupt() THRIFT_SOCKET interruptSockReader_; // is used in select/poll with serverSocket_ for interruptability THRIFT_SOCKET childInterruptSockWriter_; // is notified on interruptChildren() diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp index 33af39281..f49bb9fea 100644 --- a/lib/cpp/test/concurrency/Tests.cpp +++ b/lib/cpp/test/concurrency/Tests.cpp @@ -25,6 +25,10 @@ #include "TimerManagerTests.h" #include "ThreadManagerTests.h" +// The test weight, where 10 is 10 times more threads than baseline +// and the baseline is optimized for running in valgrind +static size_t WEIGHT = 10; + int main(int argc, char** argv) { std::string arg; @@ -37,6 +41,11 @@ int main(int argc, char** argv) { args[ix - 1] = std::string(argv[ix]); } + if (getenv("VALGRIND") != 0) { + // lower the scale of every test + WEIGHT = 1; + } + bool runAll = args[0].compare("all") == 0; if (runAll || args[0].compare("thread-factory") == 0) { @@ -45,10 +54,10 @@ int main(int argc, char** argv) { std::cout << "ThreadFactory tests..." << std::endl; - int reapLoops = 20; - int reapCount = 1000; + int reapLoops = 2 * WEIGHT; + int reapCount = 100 * WEIGHT; size_t floodLoops = 3; - size_t floodCount = 20000; + size_t floodCount = 500 * WEIGHT; std::cout << "\t\tThreadFactory reap N threads test: N = " << reapLoops << "x" << reapCount << std::endl; @@ -121,8 +130,8 @@ int main(int argc, char** argv) { std::cout << "ThreadManager tests..." << std::endl; { - size_t workerCount = 100; - size_t taskCount = 50000; + size_t workerCount = 10 * WEIGHT; + size_t taskCount = 500 * WEIGHT; int64_t delay = 10LL; ThreadManagerTests threadManagerTests; @@ -160,13 +169,13 @@ int main(int argc, char** argv) { size_t minWorkerCount = 2; - size_t maxWorkerCount = 64; + size_t maxWorkerCount = 8; - size_t tasksPerWorker = 1000; + size_t tasksPerWorker = 100 * WEIGHT; int64_t delay = 5LL; - for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 4) { + for (size_t workerCount = minWorkerCount; workerCount <= maxWorkerCount; workerCount *= 4) { size_t taskCount = workerCount * tasksPerWorker; diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h index bd6ed323d..6ac9aa51c 100644 --- a/lib/cpp/test/concurrency/ThreadFactoryTests.h +++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h @@ -21,11 +21,12 @@ #include <thrift/concurrency/Thread.h> #include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Mutex.h> #include <thrift/concurrency/Util.h> #include <assert.h> #include <iostream> -#include <set> +#include <vector> namespace apache { namespace thrift { @@ -78,13 +79,13 @@ public: int* activeCount = new int(count); - std::set<shared_ptr<Thread> > threads; + std::vector<shared_ptr<Thread> > threads; int tix; for (tix = 0; tix < count; tix++) { try { - threads.insert( + threads.push_back( threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount)))); } catch (SystemResourceException& e) { std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what() @@ -94,7 +95,7 @@ public: } tix = 0; - for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); + for (std::vector<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) { @@ -113,6 +114,7 @@ public: monitor->wait(1000); } } + delete activeCount; std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl; } @@ -253,19 +255,22 @@ public: class FloodTask : public Runnable { public: - FloodTask(const size_t id) : _id(id) {} + FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {} ~FloodTask() { if (_id % 10000 == 0) { + Synchronized sync(_mon); std::cout << "\t\tthread " << _id << " done" << std::endl; } } void run() { if (_id % 10000 == 0) { + Synchronized sync(_mon); std::cout << "\t\tthread " << _id << " started" << std::endl; } } const size_t _id; + Monitor& _mon; }; void foo(PlatformThreadFactory* tf) { (void)tf; } @@ -273,7 +278,8 @@ public: bool floodNTest(size_t loop = 1, size_t count = 100000) { bool success = false; - + Monitor mon; + for (size_t lix = 0; lix < loop; lix++) { PlatformThreadFactory threadFactory = PlatformThreadFactory(); @@ -283,10 +289,8 @@ public: try { - shared_ptr<FloodTask> task(new FloodTask(lix * count + tix)); - + shared_ptr<FloodTask> task(new FloodTask(lix * count + tix, mon)); shared_ptr<Thread> thread = threadFactory.newThread(task); - thread->start(); } catch (TException& e) { @@ -298,8 +302,8 @@ public: } } + Synchronized sync(mon); std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl; - success = true; } diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h index c07a21b55..9ecd6bad5 100644 --- a/lib/cpp/test/concurrency/ThreadManagerTests.h +++ b/lib/cpp/test/concurrency/ThreadManagerTests.h @@ -109,7 +109,7 @@ public: shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); shared_ptr<PlatformThreadFactory> threadFactory - = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()); + = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory(false)); #if !USE_BOOST_THREAD && !USE_STD_THREAD threadFactory->setPriority(PosixThreadFactory::HIGHEST); diff --git a/test/valgrind.suppress b/test/valgrind.suppress index 41f9414e6..de17cb8d8 100644 --- a/test/valgrind.suppress +++ b/test/valgrind.suppress @@ -5,5 +5,49 @@ fun:malloc fun:_ZN5boost6detail25get_once_per_thread_epochEv } +{ + boostthreads/once/ignore + Helgrind:Race + fun:_ZN5boost13thread_detail17enter_once_regionERNS_9once_flagE + fun:_ZN5boost6detail23get_current_thread_dataEv + fun:_ZN5boost6detail20interruption_checkerC1EP15pthread_mutex_tP14pthread_cond_t + fun:_ZN5boost22condition_variable_any4waitINS_11unique_lockINS_11timed_mutexEEEEEvRT_ + fun:_ZN6apache6thrift11concurrency7Monitor4Impl11waitForeverEv + fun:_ZN6apache6thrift11concurrency7Monitor4Impl19waitForTimeRelativeEl + fun:_ZN6apache6thrift11concurrency7Monitor4Impl4waitEl + fun:_ZNK6apache6thrift11concurrency7Monitor4waitEl + fun:_ZN6apache6thrift11concurrency11BoostThread5startEv + fun:_ZN6apache6thrift11concurrency4test18ThreadFactoryTests12reapNThreadsEii + fun:main +} +{ + pthread/creation-tls/ignore + Helgrind:Race + fun:mempcpy + fun:_dl_allocate_tls_init + fun:get_cached_stack + fun:allocate_stack + fun:pthread_create@@GLIBC_2.2* + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + fun:_ZN6apache6thrift11concurrency13PthreadThread5startEv + fun:_ZN6apache6thrift11concurrency4test18ThreadFactoryTests12reapNThreadsEii + fun:main +} +{ + boost-thread/creation-tls/ignore + Helgrind:Race + fun:mempcpy + fun:_dl_allocate_tls_init + fun:get_cached_stack + fun:allocate_stack + fun:pthread_create@@GLIBC_2.2.5 + obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so + fun:_ZN5boost6thread21start_thread_noexceptEv + fun:_ZN5boost6thread12start_threadEv + fun:_ZN5boost6threadC1ISt5_BindIFPFPvS3_ES3_EEEEOT_ + fun:_ZN6apache6thrift11concurrency11BoostThread5startEv + fun:_ZN6apache6thrift11concurrency4test18ThreadFactoryTests12reapNThreadsEii + fun:main +} |