/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include namespace apache { namespace thrift { namespace concurrency { using std::shared_ptr; using std::weak_ptr; /** * TimerManager class * * @version $Id:$ */ class TimerManager::Task : public Runnable { public: enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE }; Task(shared_ptr runnable) : runnable_(runnable), state_(WAITING) {} ~Task() override = default; void run() override { if (state_ == EXECUTING) { runnable_->run(); state_ = COMPLETE; } } bool operator==(const shared_ptr & runnable) const { return runnable_ == runnable; } task_iterator it_; private: shared_ptr runnable_; friend class TimerManager::Dispatcher; STATE state_; }; class TimerManager::Dispatcher : public Runnable { public: Dispatcher(TimerManager* manager) : manager_(manager) {} ~Dispatcher() override = default; /** * Dispatcher entry point * * As long as dispatcher thread is running, pull tasks off the task taskMap_ * and execute. */ void run() override { { Synchronized s(manager_->monitor_); if (manager_->state_ == TimerManager::STARTING) { manager_->state_ = TimerManager::STARTED; manager_->monitor_.notifyAll(); } } do { std::set > expiredTasks; { Synchronized s(manager_->monitor_); task_iterator expiredTaskEnd; auto now = std::chrono::steady_clock::now(); while (manager_->state_ == TimerManager::STARTED && (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) { std::chrono::milliseconds timeout(0); if (!manager_->taskMap_.empty()) { timeout = std::chrono::duration_cast(manager_->taskMap_.begin()->first - now); //because the unit of steady_clock is smaller than millisecond,timeout may be 0. if (timeout.count() == 0) { timeout = std::chrono::milliseconds(1); } manager_->monitor_.waitForTimeRelative(timeout); } else { manager_->monitor_.waitForTimeRelative(0); } now = std::chrono::steady_clock::now(); } if (manager_->state_ == TimerManager::STARTED) { for (auto ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) { shared_ptr task = ix->second; expiredTasks.insert(task); task->it_ = manager_->taskMap_.end(); if (task->state_ == TimerManager::Task::WAITING) { task->state_ = TimerManager::Task::EXECUTING; } manager_->taskCount_--; } manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd); } } for (const auto & expiredTask : expiredTasks) { expiredTask->run(); } } while (manager_->state_ == TimerManager::STARTED); { Synchronized s(manager_->monitor_); if (manager_->state_ == TimerManager::STOPPING) { manager_->state_ = TimerManager::STOPPED; manager_->monitor_.notifyAll(); } } return; } private: TimerManager* manager_; friend class TimerManager; }; #if defined(_MSC_VER) #pragma warning(push) #pragma warning(disable : 4355) // 'this' used in base member initializer list #endif TimerManager::TimerManager() : taskCount_(0), state_(TimerManager::UNINITIALIZED), dispatcher_(std::make_shared(this)) { } #if defined(_MSC_VER) #pragma warning(pop) #endif TimerManager::~TimerManager() { // If we haven't been explicitly stopped, do so now. We don't need to grab // the monitor here, since stop already takes care of reentrancy. if (state_ != STOPPED) { try { stop(); } catch (...) { // We're really hosed. } } } void TimerManager::start() { bool doStart = false; { Synchronized s(monitor_); if (!threadFactory_) { throw InvalidArgumentException(); } if (state_ == TimerManager::UNINITIALIZED) { state_ = TimerManager::STARTING; doStart = true; } } if (doStart) { dispatcherThread_ = threadFactory_->newThread(dispatcher_); dispatcherThread_->start(); } { Synchronized s(monitor_); while (state_ == TimerManager::STARTING) { monitor_.wait(); } assert(state_ != TimerManager::STARTING); } } void TimerManager::stop() { bool doStop = false; { Synchronized s(monitor_); if (state_ == TimerManager::UNINITIALIZED) { state_ = TimerManager::STOPPED; } else if (state_ != STOPPING && state_ != STOPPED) { doStop = true; state_ = STOPPING; monitor_.notifyAll(); } while (state_ != STOPPED) { monitor_.wait(); } } if (doStop) { // Clean up any outstanding tasks taskMap_.clear(); // Remove dispatcher's reference to us. dispatcher_->manager_ = nullptr; } } shared_ptr TimerManager::threadFactory() const { Synchronized s(monitor_); return threadFactory_; } void TimerManager::threadFactory(shared_ptr value) { Synchronized s(monitor_); threadFactory_ = value; } size_t TimerManager::taskCount() const { return taskCount_; } TimerManager::Timer TimerManager::add(shared_ptr task, const std::chrono::milliseconds &timeout) { return add(task, std::chrono::steady_clock::now() + timeout); } TimerManager::Timer TimerManager::add(shared_ptr task, const std::chrono::time_point& abstime) { auto now = std::chrono::steady_clock::now(); if (abstime < now) { throw InvalidArgumentException(); } Synchronized s(monitor_); if (state_ != TimerManager::STARTED) { throw IllegalStateException(); } // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him // if the expiration time is shorter than the current value. Need to test before we insert, // because the new task might insert at the front. bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first; shared_ptr timer(new Task(task)); taskCount_++; timer->it_ = taskMap_.emplace(abstime, timer); // If the task map was empty, or if we have an expiration that is earlier // than any previously seen, kick the dispatcher so it can update its // timeout if (notifyRequired) { monitor_.notify(); } return timer; } void TimerManager::remove(shared_ptr task) { Synchronized s(monitor_); if (state_ != TimerManager::STARTED) { throw IllegalStateException(); } bool found = false; for (auto ix = taskMap_.begin(); ix != taskMap_.end();) { if (*ix->second == task) { found = true; taskCount_--; taskMap_.erase(ix++); } else { ++ix; } } if (!found) { throw NoSuchTaskException(); } } void TimerManager::remove(Timer handle) { Synchronized s(monitor_); if (state_ != TimerManager::STARTED) { throw IllegalStateException(); } shared_ptr task = handle.lock(); if (!task) { throw NoSuchTaskException(); } if (task->it_ == taskMap_.end()) { // Task is being executed throw UncancellableTaskException(); } taskMap_.erase(task->it_); taskCount_--; } TimerManager::STATE TimerManager::state() const { return state_; } } } } // apache::thrift::concurrency