summaryrefslogtreecommitdiff
path: root/src/mongo/executor/async_mock_stream_factory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor/async_mock_stream_factory.cpp')
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp335
1 files changed, 0 insertions, 335 deletions
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
deleted file mode 100644
index 0d57e80ac5b..00000000000
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_mock_stream_factory.h"
-
-#include <exception>
-#include <iterator>
-#include <system_error>
-
-#include "mongo/base/system_error.h"
-#include "mongo/rpc/command_reply_builder.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/legacy_reply_builder.h"
-#include "mongo/rpc/message.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace executor {
-
-namespace {
-StringData stateToString(AsyncMockStreamFactory::MockStream::StreamState state) {
- switch (state) {
- case AsyncMockStreamFactory::MockStream::StreamState::kRunning:
- return "Running";
- case AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeConnect:
- return "Blocked before connect";
- case AsyncMockStreamFactory::MockStream::StreamState::kBlockedAfterWrite:
- return "Blocked after write";
- case AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeRead:
- return "Blocked before read";
- case AsyncMockStreamFactory::MockStream::StreamState::kCanceled:
- return "Canceled";
- }
- MONGO_UNREACHABLE;
-}
-
-template <typename Handler>
-void checkCanceled(asio::io_service::strand* strand,
- AsyncMockStreamFactory::MockStream::StreamState* state,
- Handler&& handler,
- std::size_t bytes,
- std::error_code ec = std::error_code()) {
- auto wasCancelled = (*state == AsyncMockStreamFactory::MockStream::StreamState::kCanceled);
- *state = AsyncMockStreamFactory::MockStream::StreamState::kRunning;
- strand->post([handler, wasCancelled, bytes, ec] {
- handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, bytes);
- });
-}
-
-} // namespace
-
-std::unique_ptr<AsyncStreamInterface> AsyncMockStreamFactory::makeStream(
- asio::io_service::strand* strand, const HostAndPort& target) {
- return stdx::make_unique<MockStream>(strand, this, target);
-}
-
-void AsyncMockStreamFactory::_createStream(const HostAndPort& host, MockStream* stream) {
- stdx::lock_guard<stdx::mutex> lk(_factoryMutex);
- log() << "creating stream for: " << host;
- _streams.emplace(host, stream);
- _factoryCv.notify_all();
-}
-
-void AsyncMockStreamFactory::_destroyStream(const HostAndPort& host) {
- stdx::lock_guard<stdx::mutex> lk(_factoryMutex);
- log() << "destroying stream for: " << host;
- _streams.erase(host);
-}
-
-AsyncMockStreamFactory::MockStream* AsyncMockStreamFactory::blockUntilStreamExists(
- const HostAndPort& host) {
- stdx::unique_lock<stdx::mutex> lk(_factoryMutex);
-
- auto iter = std::begin(_streams);
-
- _factoryCv.wait(lk, [&] { return (iter = _streams.find(host)) != std::end(_streams); });
-
- return iter->second;
-}
-
-AsyncMockStreamFactory::MockStream::MockStream(asio::io_service::strand* strand,
- AsyncMockStreamFactory* factory,
- const HostAndPort& target)
- : _strand(strand), _factory(factory), _target(target) {
- _factory->_createStream(_target, this);
-}
-
-AsyncMockStreamFactory::MockStream::~MockStream() {
- _factory->_destroyStream(_target);
-}
-
-void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterator endpoints,
- ConnectHandler&& connectHandler) {
- // Suspend execution after "connecting"
- _defer(kBlockedBeforeConnect, [this, connectHandler, endpoints]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- // We shim a lambda to give connectHandler the right signature since it doesn't take
- // a size_t param.
- checkCanceled(
- _strand,
- &_state,
- [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); },
- 0);
- });
-}
-
-void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
- StreamHandler&& writeHandler) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- auto begin = asio::buffer_cast<const uint8_t*>(buf);
- auto size = asio::buffer_size(buf);
- _writeQueue.push({begin, begin + size});
-
- // Suspend execution after data is written.
- _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- checkCanceled(_strand, &_state, std::move(writeHandler), size);
- });
-}
-
-void AsyncMockStreamFactory::MockStream::cancel() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- log() << "cancel() for: " << _target;
-
- // If _state is kRunning then we don't have a deferred operation to cancel.
- if (_state == kRunning) {
- return;
- }
-
- _state = kCanceled;
-}
-
-void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf,
- StreamHandler&& readHandler) {
- // Suspend execution before data is read.
- _defer(kBlockedBeforeRead, [this, buf, readHandler]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- int nToCopy = 0;
-
- // If we've set an error, return that instead of a read.
- if (!_error) {
- auto nextRead = std::move(_readQueue.front());
- _readQueue.pop();
-
- auto beginDst = asio::buffer_cast<uint8_t*>(buf);
- nToCopy = std::min(nextRead.size(), asio::buffer_size(buf));
-
- auto endSrc = std::begin(nextRead);
- std::advance(endSrc, nToCopy);
-
- auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst);
- invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy));
- log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy)
- << " remaining in buffer";
- }
-
- auto handler = readHandler;
-
- // If we did not receive all the bytes, we should return an error
- if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) {
- handler = [readHandler](std::error_code ec, size_t len) {
- // If we have an error here we've been canceled, and that takes precedence
- if (ec)
- return readHandler(ec, len);
-
- // Call the original handler with an error
- readHandler(make_error_code(ErrorCodes::InvalidLength), len);
- };
- }
-
- checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error);
- _error.clear();
- });
-}
-
-void AsyncMockStreamFactory::MockStream::pushRead(std::vector<uint8_t> toRead) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_state != kRunning);
- _readQueue.emplace(std::move(toRead));
-}
-
-void AsyncMockStreamFactory::MockStream::setError(std::error_code ec) {
- _error = ec;
-}
-
-std::vector<uint8_t> AsyncMockStreamFactory::MockStream::popWrite() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_state != kRunning);
- auto nextWrite = std::move(_writeQueue.front());
- _writeQueue.pop();
- return nextWrite;
-}
-
-void AsyncMockStreamFactory::MockStream::_defer(StreamState state, Action&& handler) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _defer_inlock(state, std::move(handler));
-}
-
-void AsyncMockStreamFactory::MockStream::_defer_inlock(StreamState state, Action&& handler) {
- invariant(_state == kRunning);
- _state = state;
-
- invariant(!_deferredAction);
- _deferredAction = std::move(handler);
- _deferredCV.notify_one();
-}
-
-void AsyncMockStreamFactory::MockStream::unblock() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _unblock_inlock();
-}
-
-void AsyncMockStreamFactory::MockStream::_unblock_inlock() {
- // Can be canceled here at which point we will call the handler with the CallbackCanceled
- // status when we invoke the _deferredAction.
- invariant(_state != kRunning);
-
- if (_state != kCanceled) {
- _state = kRunning;
- }
- // Post our deferred action to resume state machine execution
- invariant(_deferredAction);
- _strand->post(std::move(_deferredAction));
- _deferredAction = nullptr;
-}
-
-auto AsyncMockStreamFactory::MockStream::waitUntilBlocked() -> StreamState {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _deferredCV.wait(lk, [this]() { return _state != kRunning; });
- log() << "returning from waitUntilBlocked, state: " << stateToString(_state);
- return _state;
-}
-
-HostAndPort AsyncMockStreamFactory::MockStream::target() {
- return _target;
-}
-
-void AsyncMockStreamFactory::MockStream::simulateServer(
- rpc::Protocol proto,
- const stdx::function<RemoteCommandResponse(RemoteCommandRequest)> replyFunc) {
- std::exception_ptr ex;
- uint32_t messageId = 0;
-
- RemoteCommandResponse resp;
- {
- WriteEvent write{this};
-
- std::vector<uint8_t> messageData = popWrite();
- Message msg(SharedBuffer::allocate(messageData.size()));
- memcpy(msg.buf(), messageData.data(), messageData.size());
-
- auto request = rpc::opMsgRequestFromAnyProtocol(msg);
- ASSERT(rpc::protocolForMessage(msg) == proto);
-
- RemoteCommandRequest rcr(target(), request.getDatabase().toString(), request.body, nullptr);
-
- messageId = msg.header().getId();
-
- // So we can allow ASSERTs in replyFunc, we capture any exceptions, but rethrow
- // them later to prevent deadlock
- try {
- resp = replyFunc(std::move(rcr));
- } catch (...) {
- ex = std::current_exception();
- }
- }
-
- auto replyBuilder = rpc::makeReplyBuilder(proto);
- replyBuilder->setCommandReply(resp.data);
- replyBuilder->setMetadata(resp.metadata);
-
- auto replyMsg = replyBuilder->done();
- replyMsg.header().setResponseToMsgId(messageId);
-
- {
- // The first read will be for the header.
- ReadEvent read{this};
- auto hdrBytes = reinterpret_cast<const uint8_t*>(replyMsg.header().view2ptr());
- pushRead({hdrBytes, hdrBytes + sizeof(MSGHEADER::Value)});
- }
-
- {
- // The second read will be for the message data.
- ReadEvent read{this};
- auto dataBytes = reinterpret_cast<const uint8_t*>(replyMsg.buf());
- auto pastHeader = dataBytes;
- std::advance(pastHeader, sizeof(MSGHEADER::Value));
- pushRead({pastHeader, dataBytes + static_cast<std::size_t>(replyMsg.size())});
- }
-
- if (ex) {
- // Rethrow ASSERTS after the NIA completes it's Write-Read-Read sequence.
- std::rethrow_exception(ex);
- }
-}
-
-bool AsyncMockStreamFactory::MockStream::isOpen() {
- return true;
-}
-
-} // namespace executor
-} // namespace mongo