/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include
#include "mongo/executor/async_stream_factory_interface.h"
#include "mongo/executor/async_stream_interface.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/rpc/protocol.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
namespace executor {
class AsyncStreamInterface;
/**
* A factory that produces mock streams to allow for testing of NetworkInterfaceASIO.
*
* The streams produced by this factory simulate a flow of Events (ConnectEvent,
* ReadEvent, WriteEvent). The streams created by this factory will automatically
* pause themselves at each Event, and the caller must unblock them by destroying
* the Event object to continue.
*
* Example use of this factory:
*
* AsyncMockStreamFactory factory();
*
* // NIA will then call makeStream(...) to create new streams from the
* // factory, or the caller can do this manually.
*
* // Wait for the desired stream to exist
* auto stream = streamFactory.blockUntilStreamExists(host);
*
* // If we do not care to inspect after a certain event, we can skip it:
* ConnectEvent{stream}.skip();
*
* // To examine the stream at an Event, instantiate the event object.
* // When the Event object goes out of scope the stream will unblock.
* {
* WriteEvent write{stream};
*
* // Inspect what NIA wrote to this stream:
* auto messageData = stream->popWrite();
* ...
* }
*
* // The Event object will keep the stream blocked as long as it exists.
* // Use this window to perform operations on the stream or inspect it.
* {
* ReadEvent read{stream};
*
* // Simulate data sent to this stream over the network
* stream->pushRead( ... );
*
* // Or, simulate a networking error
* stream->setError( error_code );
* }
*/
class AsyncMockStreamFactory final : public AsyncStreamFactoryInterface {
public:
AsyncMockStreamFactory() = default;
std::unique_ptr makeStream(asio::io_service::strand* strand,
const HostAndPort& host) override;
/**
* A mock stream class for testing the egress networking layer.
*
* At the core of this class is an idea of deferring actions and allowing inspection
* of state of the stream before those actions happen.
*
* This class operates on the assumption that two threads are in use: a networking
* thread used by NIA to issue IO calls on the MockStream, and a test thread to
* wait on those calls and react.
*
* When the test thread creates an Event object, the constructor sends it to wait
* on a condition variable. When NIA issues an IO call on the stream, the MockStream
* load the proper handler into a placeholder, and then calls notify() on the
* condition variable. At that point the stream is paused and the test thread
* may operate on it.
*/
class MockStream final : public AsyncStreamInterface {
public:
MockStream(asio::io_service::strand* strand,
AsyncMockStreamFactory* factory,
const HostAndPort& target);
// Use unscoped enum so we can specialize on it
enum StreamState {
kRunning,
kBlockedBeforeConnect,
kBlockedBeforeRead,
kBlockedAfterWrite,
kCanceled,
};
~MockStream();
void connect(asio::ip::tcp::resolver::iterator endpoints,
ConnectHandler&& connectHandler) override;
void write(asio::const_buffer buf, StreamHandler&& writeHandler) override;
void read(asio::mutable_buffer buf, StreamHandler&& readHandler) override;
bool isOpen() override;
HostAndPort target();
StreamState waitUntilBlocked();
void cancel() override;
std::vector popWrite();
void pushRead(std::vector toRead);
void setError(std::error_code ec);
void unblock();
void simulateServer(
rpc::Protocol proto,
const stdx::function replyFunc);
private:
using Action = stdx::function;
void _defer(StreamState state, Action&& handler);
void _defer_inlock(StreamState state, Action&& handler);
void _unblock_inlock();
asio::io_service::strand* _strand;
AsyncMockStreamFactory* _factory;
HostAndPort _target;
stdx::mutex _mutex;
stdx::condition_variable _deferredCV;
StreamState _state{kRunning};
std::queue> _readQueue;
std::queue> _writeQueue;
std::error_code _error;
Action _deferredAction;
};
MockStream* blockUntilStreamExists(const HostAndPort& host);
private:
void _createStream(const HostAndPort& host, MockStream* stream);
void _destroyStream(const HostAndPort& host);
stdx::mutex _factoryMutex;
stdx::condition_variable _factoryCv;
stdx::unordered_map _streams;
};
template
class StreamEvent {
public:
StreamEvent(AsyncMockStreamFactory::MockStream* stream) : _stream(stream) {
ASSERT(stream->waitUntilBlocked() == EventType);
}
void skip() {
_stream->unblock();
skipped = true;
}
~StreamEvent() {
if (!skipped) {
skip();
}
}
private:
bool skipped = false;
AsyncMockStreamFactory::MockStream* _stream = nullptr;
};
using ReadEvent = StreamEvent;
using WriteEvent = StreamEvent;
using ConnectEvent =
StreamEvent;
} // namespace executor
} // namespace mongo