summaryrefslogtreecommitdiff
path: root/src/mongo/transport/grpc/mock_client_stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/grpc/mock_client_stream.cpp')
-rw-r--r--src/mongo/transport/grpc/mock_client_stream.cpp50
1 files changed, 44 insertions, 6 deletions
diff --git a/src/mongo/transport/grpc/mock_client_stream.cpp b/src/mongo/transport/grpc/mock_client_stream.cpp
index 0952c8be49c..dfc00f8c51a 100644
--- a/src/mongo/transport/grpc/mock_client_stream.cpp
+++ b/src/mongo/transport/grpc/mock_client_stream.cpp
@@ -29,26 +29,64 @@
#include "mongo/transport/grpc/mock_client_stream.h"
-#include "mongo/db/service_context.h"
#include "mongo/transport/grpc/mock_util.h"
+#include "mongo/util/interruptible.h"
namespace mongo::transport::grpc {
-MockClientStream::MockClientStream(HostAndPort hostAndPort,
- Milliseconds timeout,
+MockClientStream::MockClientStream(HostAndPort remote,
Future<MetadataContainer>&& initialMetadataFuture,
+ Future<::grpc::Status>&& rpcReturnStatus,
+ std::shared_ptr<MockCancellationState> rpcCancellationState,
BidirectionalPipe::End&& pipe)
- : _deadline{getGlobalServiceContext()->getFastClockSource()->now() + timeout},
+ : _remote{std::move(remote)},
_serverInitialMetadata{std::move(initialMetadataFuture)},
+ _rpcReturnStatus{std::move(rpcReturnStatus)},
+ _rpcCancellationState(std::move(rpcCancellationState)),
_pipe{std::move(pipe)} {}
boost::optional<SharedBuffer> MockClientStream::read() {
+ // Even if the server side handler of this stream has set a final status for the RPC (i.e.
+ // _rpcReturnStatus is ready), there may still be unread messages in the queue that the server
+ // sent before setting that status, so only return early here if the RPC was cancelled.
+ // Otherwise, try to read whatever messages are in the queue.
+ if (_rpcCancellationState->isCancelled()) {
+ return boost::none;
+ }
+
return runWithDeadline<boost::optional<SharedBuffer>>(
- _deadline, [&](Interruptible* i) { return _pipe.read(i); });
+ _rpcCancellationState->getDeadline(), [&](Interruptible* i) { return _pipe.read(i); });
}
bool MockClientStream::write(ConstSharedBuffer msg) {
- return runWithDeadline<bool>(_deadline, [&](Interruptible* i) { return _pipe.write(msg, i); });
+ if (_rpcCancellationState->isCancelled() || _rpcReturnStatus.isReady()) {
+ return false;
+ }
+
+ return runWithDeadline<bool>(_rpcCancellationState->getDeadline(),
+ [&](Interruptible* i) { return _pipe.write(msg, i); });
+}
+
+::grpc::Status MockClientStream::finish() {
+ // We use a busy wait here because there is no easy way to wait until all the messages in the
+ // pipe have been read.
+ while (!_pipe.isConsumed() && !_rpcCancellationState->isDeadlineExceeded()) {
+ sleepFor(Milliseconds(1));
+ }
+
+ invariant(_rpcReturnStatus.isReady() || _rpcCancellationState->isCancelled());
+
+ if (auto cancellationStatus = _rpcCancellationState->getCancellationStatus();
+ cancellationStatus.has_value()) {
+ return *cancellationStatus;
+ }
+
+ return _rpcReturnStatus.get();
+}
+
+void MockClientStream::_cancel() {
+ _rpcCancellationState->cancel(::grpc::Status::CANCELLED);
+ _pipe.close();
}
} // namespace mongo::transport::grpc