diff options
Diffstat (limited to 'src/mongo/transport/grpc/mock_client_stream.cpp')
-rw-r--r-- | src/mongo/transport/grpc/mock_client_stream.cpp | 50 |
1 files changed, 44 insertions, 6 deletions
diff --git a/src/mongo/transport/grpc/mock_client_stream.cpp b/src/mongo/transport/grpc/mock_client_stream.cpp index 0952c8be49c..dfc00f8c51a 100644 --- a/src/mongo/transport/grpc/mock_client_stream.cpp +++ b/src/mongo/transport/grpc/mock_client_stream.cpp @@ -29,26 +29,64 @@ #include "mongo/transport/grpc/mock_client_stream.h" -#include "mongo/db/service_context.h" #include "mongo/transport/grpc/mock_util.h" +#include "mongo/util/interruptible.h" namespace mongo::transport::grpc { -MockClientStream::MockClientStream(HostAndPort hostAndPort, - Milliseconds timeout, +MockClientStream::MockClientStream(HostAndPort remote, Future<MetadataContainer>&& initialMetadataFuture, + Future<::grpc::Status>&& rpcReturnStatus, + std::shared_ptr<MockCancellationState> rpcCancellationState, BidirectionalPipe::End&& pipe) - : _deadline{getGlobalServiceContext()->getFastClockSource()->now() + timeout}, + : _remote{std::move(remote)}, _serverInitialMetadata{std::move(initialMetadataFuture)}, + _rpcReturnStatus{std::move(rpcReturnStatus)}, + _rpcCancellationState(std::move(rpcCancellationState)), _pipe{std::move(pipe)} {} boost::optional<SharedBuffer> MockClientStream::read() { + // Even if the server side handler of this stream has set a final status for the RPC (i.e. + // _rpcReturnStatus is ready), there may still be unread messages in the queue that the server + // sent before setting that status, so only return early here if the RPC was cancelled. + // Otherwise, try to read whatever messages are in the queue. + if (_rpcCancellationState->isCancelled()) { + return boost::none; + } + return runWithDeadline<boost::optional<SharedBuffer>>( - _deadline, [&](Interruptible* i) { return _pipe.read(i); }); + _rpcCancellationState->getDeadline(), [&](Interruptible* i) { return _pipe.read(i); }); } bool MockClientStream::write(ConstSharedBuffer msg) { - return runWithDeadline<bool>(_deadline, [&](Interruptible* i) { return _pipe.write(msg, i); }); + if (_rpcCancellationState->isCancelled() || _rpcReturnStatus.isReady()) { + return false; + } + + return runWithDeadline<bool>(_rpcCancellationState->getDeadline(), + [&](Interruptible* i) { return _pipe.write(msg, i); }); +} + +::grpc::Status MockClientStream::finish() { + // We use a busy wait here because there is no easy way to wait until all the messages in the + // pipe have been read. + while (!_pipe.isConsumed() && !_rpcCancellationState->isDeadlineExceeded()) { + sleepFor(Milliseconds(1)); + } + + invariant(_rpcReturnStatus.isReady() || _rpcCancellationState->isCancelled()); + + if (auto cancellationStatus = _rpcCancellationState->getCancellationStatus(); + cancellationStatus.has_value()) { + return *cancellationStatus; + } + + return _rpcReturnStatus.get(); +} + +void MockClientStream::_cancel() { + _rpcCancellationState->cancel(::grpc::Status::CANCELLED); + _pipe.close(); } } // namespace mongo::transport::grpc |