summaryrefslogtreecommitdiff
path: root/src/mongo/transport/grpc/mock_server_stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/grpc/mock_server_stream.cpp')
-rw-r--r--src/mongo/transport/grpc/mock_server_stream.cpp52
1 files changed, 39 insertions, 13 deletions
diff --git a/src/mongo/transport/grpc/mock_server_stream.cpp b/src/mongo/transport/grpc/mock_server_stream.cpp
index 8c8ca1df559..8fa89bdba90 100644
--- a/src/mongo/transport/grpc/mock_server_stream.cpp
+++ b/src/mongo/transport/grpc/mock_server_stream.cpp
@@ -31,42 +31,68 @@
#include "mongo/db/service_context.h"
#include "mongo/transport/grpc/mock_util.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/interruptible.h"
namespace mongo::transport::grpc {
-MockServerStream::MockServerStream(HostAndPort hostAndPort,
- Milliseconds timeout,
+MockServerStream::MockServerStream(HostAndPort remote,
Promise<MetadataContainer>&& initialMetadataPromise,
+ Promise<::grpc::Status>&& rpcTerminationStatusPromise,
+ std::shared_ptr<MockCancellationState> rpcCancellationState,
BidirectionalPipe::End&& serverPipeEnd,
MetadataView clientMetadata)
- : _deadline{getGlobalServiceContext()->getFastClockSource()->now() + timeout},
+ : _remote(std::move(remote)),
_initialMetadata(std::move(initialMetadataPromise)),
+ _rpcReturnStatus(std::move(rpcTerminationStatusPromise)),
+ _finalStatusReturned(false),
+ _rpcCancellationState(std::move(rpcCancellationState)),
_pipe{std::move(serverPipeEnd)},
- _clientMetadata{std::move(clientMetadata)},
- _hostAndPort(std::move(hostAndPort)) {}
+ _clientMetadata{std::move(clientMetadata)} {}
boost::optional<SharedBuffer> MockServerStream::read() {
+ invariant(!*_finalStatusReturned);
+
return runWithDeadline<boost::optional<SharedBuffer>>(
- _deadline, [&](Interruptible* i) { return _pipe.read(i); });
+ _rpcCancellationState->getDeadline(), [&](Interruptible* i) { return _pipe.read(i); });
}
bool MockServerStream::isCancelled() const {
- return _cancellationSource.token().isCanceled() ||
- getGlobalServiceContext()->getFastClockSource()->now() > _deadline;
+ return _rpcCancellationState->isCancelled();
}
bool MockServerStream::write(ConstSharedBuffer msg) {
- if (_cancellationSource.token().isCanceled() ||
- getGlobalServiceContext()->getFastClockSource()->now() > _deadline) {
+ invariant(!*_finalStatusReturned);
+ if (isCancelled()) {
return false;
}
+
_initialMetadata.trySend();
- return runWithDeadline<bool>(_deadline, [&](Interruptible* i) { return _pipe.write(msg, i); });
+ return runWithDeadline<bool>(_rpcCancellationState->getDeadline(),
+ [&](Interruptible* i) { return _pipe.write(msg, i); });
}
-void MockServerStream::close() {
- _cancellationSource.cancel();
+void MockServerStream::sendReturnStatus(::grpc::Status status) {
+ {
+ auto finalStatusReturned = _finalStatusReturned.synchronize();
+ invariant(!*finalStatusReturned);
+ *finalStatusReturned = true;
+ // Client side ignores the mocked return value in the event of a cancellation, so don't need
+ // to check if stream has been cancelled before sending the status.
+ }
+ _rpcReturnStatus.emplaceValue(std::move(status));
+ _pipe.close();
+}
+
+void MockServerStream::cancel(::grpc::Status status) {
+ // Only mark the RPC as cancelled if a status hasn't already been returned to client.
+ auto finalStatusReturned = _finalStatusReturned.synchronize();
+ if (*finalStatusReturned) {
+ return;
+ }
+ // Need to update the cancellation state before closing the pipe so that when a stream
+ // read/write is interrupted, the cancellation state will already be up to date.
+ _rpcCancellationState->cancel(std::move(status));
_pipe.close();
}