diff options
Diffstat (limited to 'src/mongo/transport/grpc/mock_server_stream.cpp')
-rw-r--r-- | src/mongo/transport/grpc/mock_server_stream.cpp | 52 |
1 files changed, 39 insertions, 13 deletions
diff --git a/src/mongo/transport/grpc/mock_server_stream.cpp b/src/mongo/transport/grpc/mock_server_stream.cpp index 8c8ca1df559..8fa89bdba90 100644 --- a/src/mongo/transport/grpc/mock_server_stream.cpp +++ b/src/mongo/transport/grpc/mock_server_stream.cpp @@ -31,42 +31,68 @@ #include "mongo/db/service_context.h" #include "mongo/transport/grpc/mock_util.h" +#include "mongo/util/assert_util.h" #include "mongo/util/interruptible.h" namespace mongo::transport::grpc { -MockServerStream::MockServerStream(HostAndPort hostAndPort, - Milliseconds timeout, +MockServerStream::MockServerStream(HostAndPort remote, Promise<MetadataContainer>&& initialMetadataPromise, + Promise<::grpc::Status>&& rpcTerminationStatusPromise, + std::shared_ptr<MockCancellationState> rpcCancellationState, BidirectionalPipe::End&& serverPipeEnd, MetadataView clientMetadata) - : _deadline{getGlobalServiceContext()->getFastClockSource()->now() + timeout}, + : _remote(std::move(remote)), _initialMetadata(std::move(initialMetadataPromise)), + _rpcReturnStatus(std::move(rpcTerminationStatusPromise)), + _finalStatusReturned(false), + _rpcCancellationState(std::move(rpcCancellationState)), _pipe{std::move(serverPipeEnd)}, - _clientMetadata{std::move(clientMetadata)}, - _hostAndPort(std::move(hostAndPort)) {} + _clientMetadata{std::move(clientMetadata)} {} boost::optional<SharedBuffer> MockServerStream::read() { + invariant(!*_finalStatusReturned); + return runWithDeadline<boost::optional<SharedBuffer>>( - _deadline, [&](Interruptible* i) { return _pipe.read(i); }); + _rpcCancellationState->getDeadline(), [&](Interruptible* i) { return _pipe.read(i); }); } bool MockServerStream::isCancelled() const { - return _cancellationSource.token().isCanceled() || - getGlobalServiceContext()->getFastClockSource()->now() > _deadline; + return _rpcCancellationState->isCancelled(); } bool MockServerStream::write(ConstSharedBuffer msg) { - if (_cancellationSource.token().isCanceled() || - getGlobalServiceContext()->getFastClockSource()->now() > _deadline) { + invariant(!*_finalStatusReturned); + if (isCancelled()) { return false; } + _initialMetadata.trySend(); - return runWithDeadline<bool>(_deadline, [&](Interruptible* i) { return _pipe.write(msg, i); }); + return runWithDeadline<bool>(_rpcCancellationState->getDeadline(), + [&](Interruptible* i) { return _pipe.write(msg, i); }); } -void MockServerStream::close() { - _cancellationSource.cancel(); +void MockServerStream::sendReturnStatus(::grpc::Status status) { + { + auto finalStatusReturned = _finalStatusReturned.synchronize(); + invariant(!*finalStatusReturned); + *finalStatusReturned = true; + // Client side ignores the mocked return value in the event of a cancellation, so don't need + // to check if stream has been cancelled before sending the status. + } + _rpcReturnStatus.emplaceValue(std::move(status)); + _pipe.close(); +} + +void MockServerStream::cancel(::grpc::Status status) { + // Only mark the RPC as cancelled if a status hasn't already been returned to client. + auto finalStatusReturned = _finalStatusReturned.synchronize(); + if (*finalStatusReturned) { + return; + } + // Need to update the cancellation state before closing the pipe so that when a stream + // read/write is interrupted, the cancellation state will already be up to date. + _rpcCancellationState->cancel(std::move(status)); _pipe.close(); } |