/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include #include #include #include #include "mongo/executor/task_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/future.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool_interface.h" namespace mongo { namespace executor { /** * A mock class mimicking TaskExecutor::CallbackState, does nothing. */ class MockCallbackState final : public TaskExecutor::CallbackState { public: MockCallbackState() = default; void cancel() override {} void waitForCompletion() override {} bool isCanceled() const override { return false; } }; TaskExecutor::CallbackHandle makeCallbackHandle() { return TaskExecutor::CallbackHandle(std::make_shared()); } /** * Simple future-like utility for waiting for the result of startCommand. */ template class Deferred { public: template void emplace(Args&&... args) { _emplace(_state.get(), std::forward(args)...); } T& get() { return _get(_state.get()); } bool hasCompleted() { stdx::unique_lock lk(_state->mtx); return _state->thing.is_initialized(); } template auto then(ThreadPoolInterface* pool, Continuation&& continuation) -> Deferred>().get()))> { // XXX: The ugliness of the above type signature is because you can't refer to 'this' in // a template parameter, at least on g++-4.8.2. auto state = _state; Deferred thenDeferred; pool->schedule([this, thenDeferred, continuation, state]() mutable { thenDeferred.emplace(continuation(_get(state.get()))); }); return thenDeferred; } private: struct State { stdx::mutex mtx; stdx::condition_variable cv; boost::optional thing; }; template void _emplace(State* state, Args&&... args) { stdx::lock_guard lk(_state->mtx); invariant(!state->thing.is_initialized()); state->thing.emplace(std::forward(args)...); state->cv.notify_one(); } T& _get(State* state) { stdx::unique_lock lk(state->mtx); state->cv.wait(lk, [state] { return state->thing.is_initialized(); }); return *state->thing; } private: std::shared_ptr _state = std::make_shared(); }; namespace helpers { template static Deferred> collect(std::vector>& ds, ThreadPoolInterface* pool) { Deferred> out; struct CollectState { // hack to avoid requiring U to be default constructible. std::vector> mem{}; std::size_t numFinished = 0; std::size_t goal = 0; stdx::mutex mtx; }; auto collectState = std::make_shared(); collectState->goal = ds.size(); collectState->mem.resize(collectState->goal); for (std::size_t i = 0; i < ds.size(); ++i) { ds[i].then(pool, [collectState, out, i](T res) mutable { // The bool return is unused. stdx::lock_guard lk(collectState->mtx); collectState->mem[i] = std::move(res); // If we're done. if (collectState->goal == ++collectState->numFinished) { std::vector outInitialized; outInitialized.reserve(collectState->mem.size()); for (auto&& mem_entry : collectState->mem) { outInitialized.emplace_back(std::move(*mem_entry)); } out.emplace(outInitialized); } return true; }); } return out; } } // namespace helpers } // namespace executor } // namespace mongo