summaryrefslogtreecommitdiff
path: root/lib/cpp
diff options
context:
space:
mode:
authorcyy <cyyever@outlook.com>2019-01-12 14:38:28 +0800
committercyy <cyyever@outlook.com>2019-01-22 10:36:18 +0800
commitbfdbd0344bd3ddf348985b51e02212e8092859d4 (patch)
tree78ba8220a1f8a72f94eb79c053b89df3a3d77f2a /lib/cpp
parentd12dbed670acb9fbe65662fb72d2069e5ee6d0a0 (diff)
downloadthrift-bfdbd0344bd3ddf348985b51e02212e8092859d4.tar.gz
use chrono
Diffstat (limited to 'lib/cpp')
-rwxr-xr-xlib/cpp/CMakeLists.txt1
-rwxr-xr-xlib/cpp/Makefile.am4
-rw-r--r--lib/cpp/src/thrift/concurrency/Monitor.cpp54
-rw-r--r--lib/cpp/src/thrift/concurrency/Monitor.h23
-rw-r--r--lib/cpp/src/thrift/concurrency/ThreadManager.cpp28
-rw-r--r--lib/cpp/src/thrift/concurrency/TimerManager.cpp91
-rw-r--r--lib/cpp/src/thrift/concurrency/TimerManager.h20
-rw-r--r--lib/cpp/src/thrift/concurrency/Util.cpp44
-rw-r--r--lib/cpp/src/thrift/concurrency/Util.h151
-rw-r--r--lib/cpp/src/thrift/transport/TFileTransport.cpp26
-rw-r--r--lib/cpp/src/thrift/transport/TFileTransport.h4
-rw-r--r--lib/cpp/test/concurrency/Tests.cpp8
-rw-r--r--lib/cpp/test/concurrency/ThreadFactoryTests.h5
-rw-r--r--lib/cpp/test/concurrency/ThreadManagerTests.h13
-rw-r--r--lib/cpp/test/concurrency/TimerManagerTests.h5
15 files changed, 103 insertions, 374 deletions
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index 90a6c2885..f4e810461 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -36,7 +36,6 @@ set( thriftcpp_SOURCES
src/thrift/async/TConcurrentClientSyncInfo.cpp
src/thrift/concurrency/ThreadManager.cpp
src/thrift/concurrency/TimerManager.cpp
- src/thrift/concurrency/Util.cpp
src/thrift/processor/PeekProcessor.cpp
src/thrift/protocol/TBase64Utils.cpp
src/thrift/protocol/TDebugProtocol.cpp
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index de0c058c1..db9d751e4 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -67,7 +67,6 @@ libthrift_la_SOURCES = src/thrift/TApplicationException.cpp \
src/thrift/async/TConcurrentClientSyncInfo.cpp \
src/thrift/concurrency/ThreadManager.cpp \
src/thrift/concurrency/TimerManager.cpp \
- src/thrift/concurrency/Util.cpp \
src/thrift/processor/PeekProcessor.cpp \
src/thrift/protocol/TDebugProtocol.cpp \
src/thrift/protocol/TJSONProtocol.cpp \
@@ -155,8 +154,7 @@ include_concurrency_HEADERS = \
src/thrift/concurrency/Thread.h \
src/thrift/concurrency/ThreadManager.h \
src/thrift/concurrency/TimerManager.h \
- src/thrift/concurrency/FunctionRunner.h \
- src/thrift/concurrency/Util.h
+ src/thrift/concurrency/FunctionRunner.h
include_protocoldir = $(include_thriftdir)/protocol
include_protocol_HEADERS = \
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp
index 7b3b209a7..99d52b3e3 100644
--- a/lib/cpp/src/thrift/concurrency/Monitor.cpp
+++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp
@@ -21,7 +21,6 @@
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Util.h>
#include <thrift/transport/PlatformSocket.h>
#include <assert.h>
@@ -61,8 +60,8 @@ public:
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
- void wait(int64_t timeout_ms) {
- int result = waitForTimeRelative(timeout_ms);
+ void wait(const std::chrono::milliseconds &timeout) {
+ int result = waitForTimeRelative(timeout);
if (result == THRIFT_ETIMEDOUT) {
throw TimedOutException();
} else if (result != 0) {
@@ -72,12 +71,12 @@ public:
/**
* Waits until the specified timeout in milliseconds for the condition to
- * occur, or waits forever if timeout_ms == 0.
+ * occur, or waits forever if timeout is zero.
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTimeRelative(int64_t timeout_ms) {
- if (timeout_ms == 0LL) {
+ int waitForTimeRelative(const std::chrono::milliseconds &timeout) {
+ if (timeout.count() == 0) {
return waitForever();
}
@@ -86,46 +85,23 @@ public:
assert(mutexImpl);
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
+ bool timedout = (conditionVariable_.wait_for(lock, timeout)
== std::cv_status::timeout);
lock.release();
return (timedout ? THRIFT_ETIMEDOUT : 0);
}
/**
- * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
+ * Waits until the absolute time specified by abstime.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTime(const THRIFT_TIMESPEC* abstime) {
- struct timeval temp;
- temp.tv_sec = static_cast<long>(abstime->tv_sec);
- temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
- return waitForTime(&temp);
- }
-
- /**
- * Waits until the absolute time specified using struct timeval.
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTime(const struct timeval* abstime) {
+ int waitForTime(const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
assert(mutex_);
std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
- struct timeval currenttime;
- Util::toTimeval(currenttime, Util::currentTime());
-
- long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
- long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
- if (tv_sec < 0)
- tv_sec = 0;
- if (tv_usec < 0)
- tv_usec = 0;
-
std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- bool timedout = (conditionVariable_.wait_for(lock,
- std::chrono::seconds(tv_sec)
- + std::chrono::microseconds(tv_usec))
+ bool timedout = (conditionVariable_.wait_until(lock, abstime)
== std::cv_status::timeout);
lock.release();
return (timedout ? THRIFT_ETIMEDOUT : 0);
@@ -181,20 +157,16 @@ void Monitor::unlock() const {
const_cast<Monitor::Impl*>(impl_)->unlock();
}
-void Monitor::wait(int64_t timeout) const {
+void Monitor::wait(const std::chrono::milliseconds &timeout) const {
const_cast<Monitor::Impl*>(impl_)->wait(timeout);
}
-int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
-}
-
-int Monitor::waitForTime(const timeval* abstime) const {
+int Monitor::waitForTime(const std::chrono::time_point<std::chrono::steady_clock>& abstime) const {
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
-int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
+int Monitor::waitForTimeRelative(const std::chrono::milliseconds &timeout) const {
+ return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout);
}
int Monitor::waitForever() const {
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.h b/lib/cpp/src/thrift/concurrency/Monitor.h
index 11a145d55..c70fc8616 100644
--- a/lib/cpp/src/thrift/concurrency/Monitor.h
+++ b/lib/cpp/src/thrift/concurrency/Monitor.h
@@ -20,6 +20,7 @@
#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
+#include <chrono>
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Mutex.h>
@@ -67,23 +68,19 @@ public:
/**
* Waits a maximum of the specified timeout in milliseconds for the condition
- * to occur, or waits forever if timeout_ms == 0.
+ * to occur, or waits forever if timeout is zero.
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTimeRelative(int64_t timeout_ms) const;
+ int waitForTimeRelative(const std::chrono::milliseconds &timeout) const;
- /**
- * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTime(const THRIFT_TIMESPEC* abstime) const;
+ int waitForTimeRelative(int64_t timeout_ms) const { return waitForTimeRelative(std::chrono::milliseconds(timeout_ms)); }
/**
- * Waits until the absolute time specified using struct timeval.
+ * Waits until the absolute time specified by abstime.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTime(const struct timeval* abstime) const;
+ int waitForTime(const std::chrono::time_point<std::chrono::steady_clock>& abstime) const;
/**
* Waits forever until the condition occurs.
@@ -93,12 +90,14 @@ public:
/**
* Exception-throwing version of waitForTimeRelative(), called simply
- * wait(int64) for historical reasons. Timeout is in milliseconds.
+ * wait(std::chrono::milliseconds) for historical reasons. Timeout is in milliseconds.
*
- * If the condition occurs, this function returns cleanly; on timeout or
+ * If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
- void wait(int64_t timeout_ms = 0LL) const;
+ void wait(const std::chrono::milliseconds &timeout) const;
+
+ void wait(int64_t timeout_ms = 0LL) const { this->wait(std::chrono::milliseconds(timeout_ms)); }
/** Wakes up one thread waiting on this monitor. */
virtual void notify() const;
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 5e883270b..4c7c372af 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -22,7 +22,6 @@
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/Util.h>
#include <memory>
@@ -35,6 +34,7 @@ namespace thrift {
namespace concurrency {
using std::shared_ptr;
+using std::unique_ptr;
using std::dynamic_pointer_cast;
/**
@@ -180,10 +180,13 @@ class ThreadManager::Task : public Runnable {
public:
enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
- Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
+ Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
: runnable_(runnable),
- state_(WAITING),
- expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
+ state_(WAITING) {
+ if (expiration != 0ULL) {
+ expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
+ }
+ }
~Task() {}
@@ -196,13 +199,13 @@ public:
shared_ptr<Runnable> getRunnable() { return runnable_; }
- int64_t getExpireTime() const { return expireTime_; }
+ const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
private:
shared_ptr<Runnable> runnable_;
friend class ThreadManager::Worker;
STATE state_;
- int64_t expireTime_;
+ unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
};
class ThreadManager::Worker : public Runnable {
@@ -280,7 +283,7 @@ public:
// If the state is changed to anything other than EXECUTING or TIMEDOUT here
// then the execution loop needs to be changed below.
task->state_ =
- (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
+ (task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
ThreadManager::Task::TIMEDOUT :
ThreadManager::Task::EXECUTING;
}
@@ -524,15 +527,14 @@ std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
void ThreadManager::Impl::removeExpired(bool justOne) {
// this is always called under a lock
- int64_t now = 0LL;
+ if (tasks_.empty()) {
+ return;
+ }
+ auto now = std::chrono::steady_clock::now();
for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); )
{
- if (now == 0LL) {
- now = Util::currentTime();
- }
-
- if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) {
+ if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
if (expireCallback_) {
expireCallback_((*it)->getRunnable());
}
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/lib/cpp/src/thrift/concurrency/TimerManager.cpp
index 61a34ff69..edd336be0 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.cpp
@@ -19,7 +19,6 @@
#include <thrift/concurrency/TimerManager.h>
#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Util.h>
#include <assert.h>
#include <iostream>
@@ -90,21 +89,22 @@ public:
{
Synchronized s(manager_->monitor_);
task_iterator expiredTaskEnd;
- int64_t now = Util::currentTime();
+ auto now = std::chrono::steady_clock::now();
while (manager_->state_ == TimerManager::STARTED
&& (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
== manager_->taskMap_.begin()) {
- int64_t timeout = 0LL;
+ std::chrono::milliseconds timeout(0);
if (!manager_->taskMap_.empty()) {
- timeout = manager_->taskMap_.begin()->first - now;
- }
- assert((timeout != 0 && manager_->taskCount_ > 0)
- || (timeout == 0 && manager_->taskCount_ == 0));
- try {
- manager_->monitor_.wait(timeout);
- } catch (TimedOutException&) {
+ timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now);
+ //because the unit of steady_clock is smaller than millisecond,timeout may be 0.
+ if (timeout.count() == 0) {
+ timeout = std::chrono::milliseconds(1);
+ }
+ manager_->monitor_.waitForTimeRelative(timeout);
+ } else {
+ manager_->monitor_.waitForTimeRelative(0);
}
- now = Util::currentTime();
+ now = std::chrono::steady_clock::now();
}
if (manager_->state_ == TimerManager::STARTED) {
@@ -239,64 +239,39 @@ size_t TimerManager::taskCount() const {
return taskCount_;
}
-TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
- int64_t now = Util::currentTime();
- timeout += now;
-
- {
- Synchronized s(monitor_);
- if (state_ != TimerManager::STARTED) {
- throw IllegalStateException();
- }
-
- // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
- // if the expiration time is shorter than the current value. Need to test before we insert,
- // because the new task might insert at the front.
- bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
-
- shared_ptr<Task> timer(new Task(task));
- taskCount_++;
- timer->it_ = taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, timer));
-
- // If the task map was empty, or if we have an expiration that is earlier
- // than any previously seen, kick the dispatcher so it can update its
- // timeout
- if (notifyRequired) {
- monitor_.notify();
- }
-
- return timer;
- }
+TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) {
+ return add(task, std::chrono::steady_clock::now() + timeout);
}
TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
- const struct THRIFT_TIMESPEC& value) {
+ const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
+ auto now = std::chrono::steady_clock::now();
- int64_t expiration;
- Util::toMilliseconds(expiration, value);
-
- int64_t now = Util::currentTime();
-
- if (expiration < now) {
+ if (abstime < now) {
throw InvalidArgumentException();
}
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
- return add(task, expiration - now);
-}
-
-TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
- const struct timeval& value) {
-
- int64_t expiration;
- Util::toMilliseconds(expiration, value);
+ // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
+ // if the expiration time is shorter than the current value. Need to test before we insert,
+ // because the new task might insert at the front.
+ bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first;
- int64_t now = Util::currentTime();
+ shared_ptr<Task> timer(new Task(task));
+ taskCount_++;
+ timer->it_ = taskMap_.emplace(abstime, timer);
- if (expiration < now) {
- throw InvalidArgumentException();
+ // If the task map was empty, or if we have an expiration that is earlier
+ // than any previously seen, kick the dispatcher so it can update its
+ // timeout
+ if (notifyRequired) {
+ monitor_.notify();
}
- return add(task, expiration - now);
+ return timer;
}
void TimerManager::remove(shared_ptr<Runnable> task) {
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h
index 4e291e3cb..44d4738d5 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.h
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.h
@@ -72,25 +72,17 @@ public:
* @param timeout Time in milliseconds to delay before executing task
* @return Handle of the timer, which can be used to remove the timer.
*/
- virtual Timer add(std::shared_ptr<Runnable> task, int64_t timeout);
+ virtual Timer add(std::shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout);
+ Timer add(std::shared_ptr<Runnable> task, uint64_t timeout) { return add(task,std::chrono::milliseconds(timeout)); }
/**
* Adds a task to be executed at some time in the future by a worker thread.
*
* @param task The task to execute
- * @param timeout Absolute time in the future to execute task.
+ * @param abstime Absolute time in the future to execute task.
* @return Handle of the timer, which can be used to remove the timer.
*/
- virtual Timer add(std::shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& timeout);
-
- /**
- * Adds a task to be executed at some time in the future by a worker thread.
- *
- * @param task The task to execute
- * @param timeout Absolute time in the future to execute task.
- * @return Handle of the timer, which can be used to remove the timer.
- */
- virtual Timer add(std::shared_ptr<Runnable> task, const struct timeval& timeout);
+ virtual Timer add(std::shared_ptr<Runnable> task, const std::chrono::time_point<std::chrono::steady_clock>& abstime);
/**
* Removes a pending task
@@ -127,7 +119,7 @@ public:
private:
std::shared_ptr<const ThreadFactory> threadFactory_;
friend class Task;
- std::multimap<int64_t, std::shared_ptr<Task> > taskMap_;
+ std::multimap<std::chrono::time_point<std::chrono::steady_clock>, std::shared_ptr<Task> > taskMap_;
size_t taskCount_;
Monitor monitor_;
STATE state_;
@@ -135,7 +127,7 @@ private:
friend class Dispatcher;
std::shared_ptr<Dispatcher> dispatcher_;
std::shared_ptr<Thread> dispatcherThread_;
- typedef std::multimap<int64_t, std::shared_ptr<TimerManager::Task> >::iterator task_iterator;
+ using task_iterator = decltype(taskMap_)::iterator;
typedef std::pair<task_iterator, task_iterator> task_range;
};
}
diff --git a/lib/cpp/src/thrift/concurrency/Util.cpp b/lib/cpp/src/thrift/concurrency/Util.cpp
deleted file mode 100644
index dd6d19f97..000000000
--- a/lib/cpp/src/thrift/concurrency/Util.cpp
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/Thrift.h>
-#include <thrift/concurrency/Util.h>
-
-#if defined(HAVE_SYS_TIME_H)
-#include <sys/time.h>
-#endif
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
- int64_t result;
- struct timeval now;
- int ret = THRIFT_GETTIMEOFDAY(&now, NULL);
- assert(ret == 0);
- THRIFT_UNUSED_VARIABLE(ret); // squelching "unused variable" warning
- toTicks(result, now, ticksPerSec);
- return result;
-}
-}
-}
-} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h
deleted file mode 100644
index 1a915993f..000000000
--- a/lib/cpp/src/thrift/concurrency/Util.h
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef _THRIFT_CONCURRENCY_UTIL_H_
-#define _THRIFT_CONCURRENCY_UTIL_H_ 1
-
-#include <assert.h>
-#include <stddef.h>
-#include <stdint.h>
-#include <time.h>
-
-#ifdef HAVE_SYS_TIME_H
-#include <sys/time.h>
-#endif
-
-#include <thrift/transport/PlatformSocket.h>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * Utility methods
- *
- * This class contains basic utility methods for converting time formats,
- * and other common platform-dependent concurrency operations.
- * It should not be included in API headers for other concurrency library
- * headers, since it will, by definition, pull in all sorts of horrid
- * platform dependent stuff. Rather it should be inluded directly in
- * concurrency library implementation source.
- *
- * @version $Id:$
- */
-class Util {
-
- static const int64_t NS_PER_S = 1000000000LL;
- static const int64_t US_PER_S = 1000000LL;
- static const int64_t MS_PER_S = 1000LL;
-
- static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
- static const int64_t NS_PER_US = NS_PER_S / US_PER_S;
- static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
-
-public:
- /**
- * Converts millisecond timestamp into a THRIFT_TIMESPEC struct
- *
- * @param struct THRIFT_TIMESPEC& result
- * @param time or duration in milliseconds
- */
- static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) {
- result.tv_sec = value / MS_PER_S; // ms to s
- result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
- }
-
- static void toTimeval(struct timeval& result, int64_t value) {
- result.tv_sec = static_cast<uint32_t>(value / MS_PER_S); // ms to s
- result.tv_usec = static_cast<uint32_t>((value % MS_PER_S) * US_PER_MS); // ms to us
- }
-
- static void toTicks(int64_t& result,
- int64_t secs,
- int64_t oldTicks,
- int64_t oldTicksPerSec,
- int64_t newTicksPerSec) {
- result = secs * newTicksPerSec;
- result += oldTicks * newTicksPerSec / oldTicksPerSec;
-
- int64_t oldPerNew = oldTicksPerSec / newTicksPerSec;
- if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) {
- ++result;
- }
- }
- /**
- * Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch
- */
- static void toTicks(int64_t& result, const struct THRIFT_TIMESPEC& value, int64_t ticksPerSec) {
- return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
- }
-
- /**
- * Converts struct timeval to arbitrary-sized ticks since epoch
- */
- static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) {
- return toTicks(result, (unsigned long)value.tv_sec, (unsigned long)value.tv_usec, US_PER_S, ticksPerSec);
- }
-
- /**
- * Converts struct THRIFT_TIMESPEC to milliseconds
- */
- static void toMilliseconds(int64_t& result, const struct THRIFT_TIMESPEC& value) {
- return toTicks(result, value, MS_PER_S);
- }
-
- /**
- * Converts struct timeval to milliseconds
- */
- static void toMilliseconds(int64_t& result, const struct timeval& value) {
- return toTicks(result, value, MS_PER_S);
- }
-
- /**
- * Converts struct THRIFT_TIMESPEC to microseconds
- */
- static void toUsec(int64_t& result, const struct THRIFT_TIMESPEC& value) {
- return toTicks(result, value, US_PER_S);
- }
-
- /**
- * Converts struct timeval to microseconds
- */
- static void toUsec(int64_t& result, const struct timeval& value) {
- return toTicks(result, value, US_PER_S);
- }
-
- /**
- * Get current time as a number of arbitrary-size ticks from epoch
- */
- static int64_t currentTimeTicks(int64_t ticksPerSec);
-
- /**
- * Get current time as milliseconds from epoch
- */
- static int64_t currentTime() { return currentTimeTicks(MS_PER_S); }
-
- /**
- * Get current time as micros from epoch
- */
- static int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); }
-};
-}
-}
-} // apache::thrift::concurrency
-
-#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp
index afb441198..19058094c 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp
@@ -264,7 +264,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
// it is probably a non-factor for the time being
}
-bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
+bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) {
bool swap;
Guard g(mutex_);
@@ -277,7 +277,7 @@ bool TFileTransport::swapEventBuffers(struct timeval* deadline) {
} else {
if (deadline != NULL) {
// if we were handed a deadline time struct, do a timed wait
- notEmpty_.waitForTime(deadline);
+ notEmpty_.waitForTime(*deadline);
} else {
// just wait until the buffer gets an item
notEmpty_.wait();
@@ -336,8 +336,7 @@ void TFileTransport::writerThread() {
}
// Figure out the next time by which a flush must take place
- struct timeval ts_next_flush;
- getNextFlushTime(&ts_next_flush);
+ auto ts_next_flush = getNextFlushTime();
uint32_t unflushed = 0;
while (1) {
@@ -492,15 +491,13 @@ void TFileTransport::writerThread() {
} else {
struct timeval current_time;
THRIFT_GETTIMEOFDAY(&current_time, NULL);
- if (current_time.tv_sec > ts_next_flush.tv_sec
- || (current_time.tv_sec == ts_next_flush.tv_sec
- && current_time.tv_usec > ts_next_flush.tv_usec)) {
+ if (std::chrono::steady_clock::now() > ts_next_flush) {
if (unflushed > 0) {
flush = true;
} else {
// If there is no new data since the last fsync,
// don't perform the fsync, but do reset the timer.
- getNextFlushTime(&ts_next_flush);
+ ts_next_flush = getNextFlushTime();
}
}
}
@@ -509,7 +506,7 @@ void TFileTransport::writerThread() {
// sync (force flush) file to disk
THRIFT_FSYNC(fd_);
unflushed = 0;
- getNextFlushTime(&ts_next_flush);
+ ts_next_flush = getNextFlushTime();
// notify anybody waiting for flush completion
if (forced_flush) {
@@ -908,15 +905,8 @@ void TFileTransport::openLogFile() {
}
}
-void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) {
- THRIFT_GETTIMEOFDAY(ts_next_flush, NULL);
-
- ts_next_flush->tv_usec += flushMaxUs_;
- if (ts_next_flush->tv_usec > 1000000) {
- long extra_secs = ts_next_flush->tv_usec / 1000000;
- ts_next_flush->tv_usec %= 1000000;
- ts_next_flush->tv_sec += extra_secs;
- }
+std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() {
+ return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_);
}
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h
index ece271aae..4290eaa66 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.h
+++ b/lib/cpp/src/thrift/transport/TFileTransport.h
@@ -267,7 +267,7 @@ public:
private:
// helper functions for writing to a file
void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
- bool swapEventBuffers(struct timeval* deadline);
+ bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline);
bool initBufferAndWriteThread();
// control for writer thread
@@ -286,7 +286,7 @@ private:
// Utility functions
void openLogFile();
- void getNextFlushTime(struct timeval* ts_next_flush);
+ std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime();
// Class variables
readState readState_;
diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp
index fc0ba7f15..019ae67f2 100644
--- a/lib/cpp/test/concurrency/Tests.cpp
+++ b/lib/cpp/test/concurrency/Tests.cpp
@@ -94,18 +94,18 @@ int main(int argc, char** argv) {
std::cout << "\t\tUtil minimum time" << std::endl;
- int64_t time00 = Util::currentTime();
- int64_t time01 = Util::currentTime();
+ int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
- time00 = Util::currentTime();
+ time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
time01 = time00;
size_t count = 0;
while (time01 < time00 + 10) {
count++;
- time01 = Util::currentTime();
+ time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}
std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h
index 8ab754c89..ad1613ba9 100644
--- a/lib/cpp/test/concurrency/ThreadFactoryTests.h
+++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h
@@ -22,7 +22,6 @@
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Mutex.h>
-#include <thrift/concurrency/Util.h>
#include <assert.h>
#include <iostream>
@@ -221,7 +220,7 @@ public:
Monitor monitor;
- int64_t startTime = Util::currentTime();
+ int64_t startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
for (int64_t ix = 0; ix < count; ix++) {
{
@@ -233,7 +232,7 @@ public:
}
}
- int64_t endTime = Util::currentTime();
+ int64_t endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
bool success = (endTime - startTime) >= (count * timeout);
diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h
index b3a319a57..e9ed75653 100644
--- a/lib/cpp/test/concurrency/ThreadManagerTests.h
+++ b/lib/cpp/test/concurrency/ThreadManagerTests.h
@@ -21,7 +21,6 @@
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/Util.h>
#include <assert.h>
#include <deque>
@@ -66,11 +65,11 @@ public:
void run() {
- _startTime = Util::currentTime();
+ _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
sleep_(_timeout);
- _endTime = Util::currentTime();
+ _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
_done = true;
@@ -123,7 +122,7 @@ public:
new ThreadManagerTests::Task(monitor, activeCount, timeout)));
}
- int64_t time00 = Util::currentTime();
+ int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin();
ix != tasks.end();
@@ -143,7 +142,7 @@ public:
}
}
- int64_t time01 = Util::currentTime();
+ int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
int64_t firstTime = 9223372036854775807LL;
int64_t lastTime = 0;
@@ -387,9 +386,9 @@ public:
bool apiTest() {
// prove currentTime has milliseconds granularity since many other things depend on it
- int64_t a = Util::currentTime();
+ int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
sleep_(100);
- int64_t b = Util::currentTime();
+ int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
if (b - a < 50 || b - a > 150) {
std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
return false;
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index c15b14b80..24d829acf 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -20,7 +20,6 @@
#include <thrift/concurrency/TimerManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/Util.h>
#include <assert.h>
#include <iostream>
@@ -39,7 +38,7 @@ public:
public:
Task(Monitor& monitor, int64_t timeout)
: _timeout(timeout),
- _startTime(Util::currentTime()),
+ _startTime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()),
_endTime(0),
_monitor(monitor),
_success(false),
@@ -49,7 +48,7 @@ public:
void run() {
- _endTime = Util::currentTime();
+ _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
_success = (_endTime - _startTime) >= _timeout;
{