summaryrefslogtreecommitdiff
path: root/src/mongo/transport/grpc/bidirectional_pipe.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/grpc/bidirectional_pipe.h')
-rw-r--r--src/mongo/transport/grpc/bidirectional_pipe.h31
1 files changed, 25 insertions, 6 deletions
diff --git a/src/mongo/transport/grpc/bidirectional_pipe.h b/src/mongo/transport/grpc/bidirectional_pipe.h
index f1c29471eb0..b53d61b519c 100644
--- a/src/mongo/transport/grpc/bidirectional_pipe.h
+++ b/src/mongo/transport/grpc/bidirectional_pipe.h
@@ -89,20 +89,36 @@ public:
}
/**
- * Close both ends of the pipe. In progress reads and writes on either end will be
- * interrupted.
+ * Close both the read and write halves of this end of the pipe. In-progress reads and
+ * writes on this end and writes on the other end will be interrupted.
+ *
+ * Messages that have already been transmitted through this end of the pipe can still be
+ * read by the other end.
*/
void close() {
_sendHalf.close();
_recvHalf.close();
}
+ /**
+ * Returns true when at least one of the following conditions is met:
+ * - This end of the pipe is closed.
+ * - The other end of the pipe is closed and there are no more messages to be read.
+ */
+ bool isConsumed() const {
+ auto stats = _recvHalfCtrl.getStats();
+ return stats.consumerEndClosed || (stats.queueDepth == 0 && stats.producerEndClosed);
+ }
+
private:
friend BidirectionalPipe;
explicit End(SingleProducerSingleConsumerQueue<SharedBuffer>::Producer send,
- SingleProducerSingleConsumerQueue<SharedBuffer>::Consumer recv)
- : _sendHalf{std::move(send)}, _recvHalf{std::move(recv)} {}
+ SingleProducerSingleConsumerQueue<SharedBuffer>::Consumer recv,
+ SingleProducerSingleConsumerQueue<SharedBuffer>::Controller recvCtrl)
+ : _sendHalf{std::move(send)},
+ _recvHalf{std::move(recv)},
+ _recvHalfCtrl(std::move(recvCtrl)) {}
bool _isPipeClosedError(const DBException& e) const {
return e.code() == ErrorCodes::ProducerConsumerQueueEndClosed ||
@@ -111,14 +127,17 @@ public:
SingleProducerSingleConsumerQueue<SharedBuffer>::Producer _sendHalf;
SingleProducerSingleConsumerQueue<SharedBuffer>::Consumer _recvHalf;
+ SingleProducerSingleConsumerQueue<SharedBuffer>::Controller _recvHalfCtrl;
};
BidirectionalPipe() {
SingleProducerSingleConsumerQueue<SharedBuffer>::Pipe pipe1;
SingleProducerSingleConsumerQueue<SharedBuffer>::Pipe pipe2;
- left = std::unique_ptr<End>(new End(std::move(pipe1.producer), std::move(pipe2.consumer)));
- right = std::unique_ptr<End>(new End(std::move(pipe2.producer), std::move(pipe1.consumer)));
+ left = std::unique_ptr<End>(new End(
+ std::move(pipe1.producer), std::move(pipe2.consumer), std::move(pipe2.controller)));
+ right = std::unique_ptr<End>(new End(
+ std::move(pipe2.producer), std::move(pipe1.consumer), std::move(pipe1.controller)));
}
std::unique_ptr<End> left;