summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xappveyor.yml2
-rw-r--r--build/docker/centos-7.3/Dockerfile6
-rw-r--r--build/docker/debian-stretch/Dockerfile1
-rw-r--r--build/docker/ubuntu-xenial/Dockerfile1
-rw-r--r--lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp46
-rw-r--r--lib/cpp/src/thrift/concurrency/BoostThreadFactory.h2
-rw-r--r--lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp35
-rw-r--r--lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp50
-rw-r--r--lib/cpp/src/thrift/transport/TServerSocket.cpp11
-rw-r--r--lib/cpp/src/thrift/transport/TServerSocket.h2
-rw-r--r--lib/cpp/test/concurrency/Tests.cpp25
-rw-r--r--lib/cpp/test/concurrency/ThreadFactoryTests.h24
-rw-r--r--lib/cpp/test/concurrency/ThreadManagerTests.h2
-rw-r--r--test/valgrind.suppress44
14 files changed, 180 insertions, 71 deletions
diff --git a/appveyor.yml b/appveyor.yml
index fc09f87f9..4c2e36496 100755
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -40,7 +40,7 @@ environment:
LIBEVENT_VERSION: 2.0.22
QT_VERSION: 5.6
ZLIB_VERSION: 1.2.8
- DISABLED_TESTS: StressTestNonBlocking|concurrency_test
+ DISABLED_TESTS: StressTestNonBlocking
- PROFILE: MSVC2015
PLATFORM: x64
diff --git a/build/docker/centos-7.3/Dockerfile b/build/docker/centos-7.3/Dockerfile
index f79939c72..096bbaa45 100644
--- a/build/docker/centos-7.3/Dockerfile
+++ b/build/docker/centos-7.3/Dockerfile
@@ -10,11 +10,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# Apache Thrift Docker build environment for Centos
+# Apache Thrift Docker build environment for CentOS
#
# Known missing client libraries:
# - dotnet (will update to 2.0.0 separately)
-# - haxe (not in debian stretch)
+# - haxe (not in centos)
FROM centos:7.3.1611
MAINTAINER Apache Thrift <dev@thrift.apache.org>
@@ -33,12 +33,14 @@ RUN yum install -y \
flex \
gcc \
gcc-c++ \
+ gdb \
git \
libtool \
m4 \
make \
tar \
unzip \
+ valgrind \
wget && \
ln -s /usr/bin/cmake3 /usr/bin/cmake && \
ln -s /usr/bin/cpack3 /usr/bin/cpack && \
diff --git a/build/docker/debian-stretch/Dockerfile b/build/docker/debian-stretch/Dockerfile
index 70309fbe0..503eecd42 100644
--- a/build/docker/debian-stretch/Dockerfile
+++ b/build/docker/debian-stretch/Dockerfile
@@ -56,6 +56,7 @@ RUN apt-get install -y --no-install-recommends \
gdb \
ninja-build \
pkg-config \
+ valgrind \
vim
diff --git a/build/docker/ubuntu-xenial/Dockerfile b/build/docker/ubuntu-xenial/Dockerfile
index 6bad6a94c..6324ec238 100644
--- a/build/docker/ubuntu-xenial/Dockerfile
+++ b/build/docker/ubuntu-xenial/Dockerfile
@@ -60,6 +60,7 @@ RUN apt-get install -y --no-install-recommends \
llvm \
ninja-build \
pkg-config \
+ valgrind \
vim
ENV PATH /usr/lib/llvm-3.8/bin:$PATH
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
index 3661275e8..d7d8d54e9 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp
@@ -51,6 +51,7 @@ public:
private:
scoped_ptr<boost::thread> thread_;
+ Monitor monitor_;
STATE state_;
weak_ptr<BoostThread> self_;
bool detached_;
@@ -71,25 +72,46 @@ public:
}
}
- void start() {
- if (state_ != uninitialized) {
- return;
+ STATE getState() const
+ {
+ Synchronized sync(monitor_);
+ return state_;
+ }
+
+ void setState(STATE newState)
+ {
+ Synchronized sync(monitor_);
+ state_ = newState;
+
+ // unblock start() with the knowledge that the thread has actually
+ // started running, which avoids a race in detached threads.
+ if (newState == started) {
+ monitor_.notify();
}
+ }
+ void start() {
// Create reference
shared_ptr<BoostThread>* selfRef = new shared_ptr<BoostThread>();
*selfRef = self_.lock();
- state_ = starting;
+ setState(starting);
+ Synchronized sync(monitor_);
+
thread_.reset(new boost::thread(bind(threadMain, (void*)selfRef)));
if (detached_)
thread_->detach();
+
+ // Wait for the thread to start and get far enough to grab everything
+ // that it needs from the calling context, thus absolving the caller
+ // from being required to hold on to runnable indefinitely.
+ monitor_.wait();
}
void join() {
- if (!detached_ && state_ != uninitialized) {
+ if (!detached_ && getState() != uninitialized) {
thread_->join();
}
}
@@ -110,19 +132,11 @@ void* BoostThread::threadMain(void* arg) {
shared_ptr<BoostThread> thread = *(shared_ptr<BoostThread>*)arg;
delete reinterpret_cast<shared_ptr<BoostThread>*>(arg);
- if (!thread) {
- return (void*)0;
- }
-
- if (thread->state_ != starting) {
- return (void*)0;
- }
-
- thread->state_ = started;
+ thread->setState(started);
thread->runnable()->run();
- if (thread->state_ != stopping && thread->state_ != stopped) {
- thread->state_ = stopping;
+ if (thread->getState() != stopping && thread->getState() != stopped) {
+ thread->setState(stopping);
}
return (void*)0;
}
diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
index 4c7a45abb..bf11a708b 100644
--- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h
@@ -20,8 +20,8 @@
#ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_
#define _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ 1
+#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Thread.h>
-
#include <thrift/stdcxx.h>
namespace apache {
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
index d829d6905..2e35446b5 100644
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
@@ -20,7 +20,7 @@
#include <thrift/thrift-config.h>
#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Mutex.h>
+#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/PosixThreadFactory.h>
#if GOOGLE_PERFTOOLS_REGISTER_THREAD
@@ -53,8 +53,8 @@ public:
private:
pthread_t pthread_;
- Mutex state_mutex_;
- STATE state_;
+ Monitor monitor_; // guard to protect state_ and also notification
+ STATE state_; // to protect proper thread start behavior
int policy_;
int priority_;
int stackSize_;
@@ -96,14 +96,20 @@ public:
STATE getState() const
{
- Guard g(state_mutex_);
+ Synchronized sync(monitor_);
return state_;
}
void setState(STATE newState)
{
- Guard g(state_mutex_);
+ Synchronized sync(monitor_);
state_ = newState;
+
+ // unblock start() with the knowledge that the thread has actually
+ // started running, which avoids a race in detached threads.
+ if (newState == started) {
+ monitor_.notify();
+ }
}
void start() {
@@ -154,9 +160,18 @@ public:
setState(starting);
+ Synchronized sync(monitor_);
+
if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
throw SystemResourceException("pthread_create failed");
}
+
+ // The caller may not choose to guarantee the scope of the Runnable
+ // being used in the thread, so we must actually wait until the thread
+ // starts before we return. If we do not wait, it would be possible
+ // for the caller to start destructing the Runnable and the Thread,
+ // and we would end up in a race. This was identified with valgrind.
+ monitor_.wait();
}
void join() {
@@ -174,8 +189,6 @@ public:
if (res != 0) {
GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
}
- } else {
- GlobalOutput.printf("PthreadThread::join(): detached thread");
}
}
@@ -202,14 +215,6 @@ void* PthreadThread::threadMain(void* arg) {
stdcxx::shared_ptr<PthreadThread> thread = *(stdcxx::shared_ptr<PthreadThread>*)arg;
delete reinterpret_cast<stdcxx::shared_ptr<PthreadThread>*>(arg);
- if (thread == NULL) {
- return (void*)0;
- }
-
- if (thread->getState() != starting) {
- return (void*)0;
- }
-
#if GOOGLE_PERFTOOLS_REGISTER_THREAD
ProfilerRegisterThread();
#endif
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
index 4067f2418..da0c5e373 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
@@ -21,8 +21,9 @@
#if USE_STD_THREAD
-#include <thrift/concurrency/StdThreadFactory.h>
#include <thrift/concurrency/Exception.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/StdThreadFactory.h>
#include <thrift/stdcxx.h>
#include <cassert>
@@ -49,6 +50,7 @@ public:
private:
std::unique_ptr<std::thread> thread_;
+ Monitor monitor_;
STATE state_;
bool detached_;
@@ -68,18 +70,42 @@ public:
}
}
+ STATE getState() const
+ {
+ Synchronized sync(monitor_);
+ return state_;
+ }
+
+ void setState(STATE newState)
+ {
+ Synchronized sync(monitor_);
+ state_ = newState;
+
+ // unblock start() with the knowledge that the thread has actually
+ // started running, which avoids a race in detached threads.
+ if (newState == started) {
+ monitor_.notify();
+ }
+ }
+
void start() {
- if (state_ != uninitialized) {
+ if (getState() != uninitialized) {
return;
}
stdcxx::shared_ptr<StdThread> selfRef = shared_from_this();
- state_ = starting;
+ setState(starting);
+ Synchronized sync(monitor_);
thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
if (detached_)
thread_->detach();
+
+ // Wait for the thread to start and get far enough to grab everything
+ // that it needs from the calling context, thus absolving the caller
+ // from being required to hold on to runnable indefinitely.
+ monitor_.wait();
}
void join() {
@@ -96,22 +122,16 @@ public:
};
void StdThread::threadMain(stdcxx::shared_ptr<StdThread> thread) {
- if (thread == NULL) {
- return;
- }
+#if GOOGLE_PERFTOOLS_REGISTER_THREAD
+ ProfilerRegisterThread();
+#endif
- if (thread->state_ != starting) {
- return;
- }
-
- thread->state_ = started;
+ thread->setState(started);
thread->runnable()->run();
- if (thread->state_ != stopping && thread->state_ != stopped) {
- thread->state_ = stopping;
+ if (thread->getState() != stopping && thread->getState() != stopped) {
+ thread->setState(stopping);
}
-
- return;
}
StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) {
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp
index a704b068c..3179b1aec 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp
@@ -658,14 +658,21 @@ void TServerSocket::notify(THRIFT_SOCKET notifySocket) {
}
void TServerSocket::interrupt() {
- notify(interruptSockWriter_);
+ concurrency::Guard g(rwMutex_);
+ if (interruptSockWriter_ != THRIFT_INVALID_SOCKET) {
+ notify(interruptSockWriter_);
+ }
}
void TServerSocket::interruptChildren() {
- notify(childInterruptSockWriter_);
+ concurrency::Guard g(rwMutex_);
+ if (childInterruptSockWriter_ != THRIFT_INVALID_SOCKET) {
+ notify(childInterruptSockWriter_);
+ }
}
void TServerSocket::close() {
+ concurrency::Guard g(rwMutex_);
if (serverSocket_ != THRIFT_INVALID_SOCKET) {
shutdown(serverSocket_, THRIFT_SHUT_RDWR);
::THRIFT_CLOSESOCKET(serverSocket_);
diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h
index cb11dc49a..1daaa8299 100644
--- a/lib/cpp/src/thrift/transport/TServerSocket.h
+++ b/lib/cpp/src/thrift/transport/TServerSocket.h
@@ -20,6 +20,7 @@
#ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
#define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1
+#include <thrift/concurrency/Mutex.h>
#include <thrift/stdcxx.h>
#include <thrift/transport/PlatformSocket.h>
#include <thrift/transport/TServerTransport.h>
@@ -169,6 +170,7 @@ private:
bool keepAlive_;
bool listening_;
+ concurrency::Mutex rwMutex_; // thread-safe interrupt
THRIFT_SOCKET interruptSockWriter_; // is notified on interrupt()
THRIFT_SOCKET interruptSockReader_; // is used in select/poll with serverSocket_ for interruptability
THRIFT_SOCKET childInterruptSockWriter_; // is notified on interruptChildren()
diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp
index 33af39281..f49bb9fea 100644
--- a/lib/cpp/test/concurrency/Tests.cpp
+++ b/lib/cpp/test/concurrency/Tests.cpp
@@ -25,6 +25,10 @@
#include "TimerManagerTests.h"
#include "ThreadManagerTests.h"
+// The test weight, where 10 is 10 times more threads than baseline
+// and the baseline is optimized for running in valgrind
+static size_t WEIGHT = 10;
+
int main(int argc, char** argv) {
std::string arg;
@@ -37,6 +41,11 @@ int main(int argc, char** argv) {
args[ix - 1] = std::string(argv[ix]);
}
+ if (getenv("VALGRIND") != 0) {
+ // lower the scale of every test
+ WEIGHT = 1;
+ }
+
bool runAll = args[0].compare("all") == 0;
if (runAll || args[0].compare("thread-factory") == 0) {
@@ -45,10 +54,10 @@ int main(int argc, char** argv) {
std::cout << "ThreadFactory tests..." << std::endl;
- int reapLoops = 20;
- int reapCount = 1000;
+ int reapLoops = 2 * WEIGHT;
+ int reapCount = 100 * WEIGHT;
size_t floodLoops = 3;
- size_t floodCount = 20000;
+ size_t floodCount = 500 * WEIGHT;
std::cout << "\t\tThreadFactory reap N threads test: N = " << reapLoops << "x" << reapCount << std::endl;
@@ -121,8 +130,8 @@ int main(int argc, char** argv) {
std::cout << "ThreadManager tests..." << std::endl;
{
- size_t workerCount = 100;
- size_t taskCount = 50000;
+ size_t workerCount = 10 * WEIGHT;
+ size_t taskCount = 500 * WEIGHT;
int64_t delay = 10LL;
ThreadManagerTests threadManagerTests;
@@ -160,13 +169,13 @@ int main(int argc, char** argv) {
size_t minWorkerCount = 2;
- size_t maxWorkerCount = 64;
+ size_t maxWorkerCount = 8;
- size_t tasksPerWorker = 1000;
+ size_t tasksPerWorker = 100 * WEIGHT;
int64_t delay = 5LL;
- for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 4) {
+ for (size_t workerCount = minWorkerCount; workerCount <= maxWorkerCount; workerCount *= 4) {
size_t taskCount = workerCount * tasksPerWorker;
diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h
index bd6ed323d..6ac9aa51c 100644
--- a/lib/cpp/test/concurrency/ThreadFactoryTests.h
+++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h
@@ -21,11 +21,12 @@
#include <thrift/concurrency/Thread.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Util.h>
#include <assert.h>
#include <iostream>
-#include <set>
+#include <vector>
namespace apache {
namespace thrift {
@@ -78,13 +79,13 @@ public:
int* activeCount = new int(count);
- std::set<shared_ptr<Thread> > threads;
+ std::vector<shared_ptr<Thread> > threads;
int tix;
for (tix = 0; tix < count; tix++) {
try {
- threads.insert(
+ threads.push_back(
threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
} catch (SystemResourceException& e) {
std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what()
@@ -94,7 +95,7 @@ public:
}
tix = 0;
- for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin();
+ for (std::vector<shared_ptr<Thread> >::const_iterator thread = threads.begin();
thread != threads.end();
tix++, ++thread) {
@@ -113,6 +114,7 @@ public:
monitor->wait(1000);
}
}
+
delete activeCount;
std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl;
}
@@ -253,19 +255,22 @@ public:
class FloodTask : public Runnable {
public:
- FloodTask(const size_t id) : _id(id) {}
+ FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {}
~FloodTask() {
if (_id % 10000 == 0) {
+ Synchronized sync(_mon);
std::cout << "\t\tthread " << _id << " done" << std::endl;
}
}
void run() {
if (_id % 10000 == 0) {
+ Synchronized sync(_mon);
std::cout << "\t\tthread " << _id << " started" << std::endl;
}
}
const size_t _id;
+ Monitor& _mon;
};
void foo(PlatformThreadFactory* tf) { (void)tf; }
@@ -273,7 +278,8 @@ public:
bool floodNTest(size_t loop = 1, size_t count = 100000) {
bool success = false;
-
+ Monitor mon;
+
for (size_t lix = 0; lix < loop; lix++) {
PlatformThreadFactory threadFactory = PlatformThreadFactory();
@@ -283,10 +289,8 @@ public:
try {
- shared_ptr<FloodTask> task(new FloodTask(lix * count + tix));
-
+ shared_ptr<FloodTask> task(new FloodTask(lix * count + tix, mon));
shared_ptr<Thread> thread = threadFactory.newThread(task);
-
thread->start();
} catch (TException& e) {
@@ -298,8 +302,8 @@ public:
}
}
+ Synchronized sync(mon);
std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
-
success = true;
}
diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h
index c07a21b55..9ecd6bad5 100644
--- a/lib/cpp/test/concurrency/ThreadManagerTests.h
+++ b/lib/cpp/test/concurrency/ThreadManagerTests.h
@@ -109,7 +109,7 @@ public:
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
shared_ptr<PlatformThreadFactory> threadFactory
- = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
+ = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory(false));
#if !USE_BOOST_THREAD && !USE_STD_THREAD
threadFactory->setPriority(PosixThreadFactory::HIGHEST);
diff --git a/test/valgrind.suppress b/test/valgrind.suppress
index 41f9414e6..de17cb8d8 100644
--- a/test/valgrind.suppress
+++ b/test/valgrind.suppress
@@ -5,5 +5,49 @@
fun:malloc
fun:_ZN5boost6detail25get_once_per_thread_epochEv
}
+{
+ boostthreads/once/ignore
+ Helgrind:Race
+ fun:_ZN5boost13thread_detail17enter_once_regionERNS_9once_flagE
+ fun:_ZN5boost6detail23get_current_thread_dataEv
+ fun:_ZN5boost6detail20interruption_checkerC1EP15pthread_mutex_tP14pthread_cond_t
+ fun:_ZN5boost22condition_variable_any4waitINS_11unique_lockINS_11timed_mutexEEEEEvRT_
+ fun:_ZN6apache6thrift11concurrency7Monitor4Impl11waitForeverEv
+ fun:_ZN6apache6thrift11concurrency7Monitor4Impl19waitForTimeRelativeEl
+ fun:_ZN6apache6thrift11concurrency7Monitor4Impl4waitEl
+ fun:_ZNK6apache6thrift11concurrency7Monitor4waitEl
+ fun:_ZN6apache6thrift11concurrency11BoostThread5startEv
+ fun:_ZN6apache6thrift11concurrency4test18ThreadFactoryTests12reapNThreadsEii
+ fun:main
+}
+{
+ pthread/creation-tls/ignore
+ Helgrind:Race
+ fun:mempcpy
+ fun:_dl_allocate_tls_init
+ fun:get_cached_stack
+ fun:allocate_stack
+ fun:pthread_create@@GLIBC_2.2*
+ obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so
+ fun:_ZN6apache6thrift11concurrency13PthreadThread5startEv
+ fun:_ZN6apache6thrift11concurrency4test18ThreadFactoryTests12reapNThreadsEii
+ fun:main
+}
+{
+ boost-thread/creation-tls/ignore
+ Helgrind:Race
+ fun:mempcpy
+ fun:_dl_allocate_tls_init
+ fun:get_cached_stack
+ fun:allocate_stack
+ fun:pthread_create@@GLIBC_2.2.5
+ obj:/usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so
+ fun:_ZN5boost6thread21start_thread_noexceptEv
+ fun:_ZN5boost6thread12start_threadEv
+ fun:_ZN5boost6threadC1ISt5_BindIFPFPvS3_ES3_EEEEOT_
+ fun:_ZN6apache6thrift11concurrency11BoostThread5startEv
+ fun:_ZN6apache6thrift11concurrency4test18ThreadFactoryTests12reapNThreadsEii
+ fun:main
+}