diff options
Diffstat (limited to 'src/mongo/transport/grpc/bidirectional_pipe.h')
-rw-r--r-- | src/mongo/transport/grpc/bidirectional_pipe.h | 31 |
1 files changed, 25 insertions, 6 deletions
diff --git a/src/mongo/transport/grpc/bidirectional_pipe.h b/src/mongo/transport/grpc/bidirectional_pipe.h index f1c29471eb0..b53d61b519c 100644 --- a/src/mongo/transport/grpc/bidirectional_pipe.h +++ b/src/mongo/transport/grpc/bidirectional_pipe.h @@ -89,20 +89,36 @@ public: } /** - * Close both ends of the pipe. In progress reads and writes on either end will be - * interrupted. + * Close both the read and write halves of this end of the pipe. In-progress reads and + * writes on this end and writes on the other end will be interrupted. + * + * Messages that have already been transmitted through this end of the pipe can still be + * read by the other end. */ void close() { _sendHalf.close(); _recvHalf.close(); } + /** + * Returns true when at least one of the following conditions is met: + * - This end of the pipe is closed. + * - The other end of the pipe is closed and there are no more messages to be read. + */ + bool isConsumed() const { + auto stats = _recvHalfCtrl.getStats(); + return stats.consumerEndClosed || (stats.queueDepth == 0 && stats.producerEndClosed); + } + private: friend BidirectionalPipe; explicit End(SingleProducerSingleConsumerQueue<SharedBuffer>::Producer send, - SingleProducerSingleConsumerQueue<SharedBuffer>::Consumer recv) - : _sendHalf{std::move(send)}, _recvHalf{std::move(recv)} {} + SingleProducerSingleConsumerQueue<SharedBuffer>::Consumer recv, + SingleProducerSingleConsumerQueue<SharedBuffer>::Controller recvCtrl) + : _sendHalf{std::move(send)}, + _recvHalf{std::move(recv)}, + _recvHalfCtrl(std::move(recvCtrl)) {} bool _isPipeClosedError(const DBException& e) const { return e.code() == ErrorCodes::ProducerConsumerQueueEndClosed || @@ -111,14 +127,17 @@ public: SingleProducerSingleConsumerQueue<SharedBuffer>::Producer _sendHalf; SingleProducerSingleConsumerQueue<SharedBuffer>::Consumer _recvHalf; + SingleProducerSingleConsumerQueue<SharedBuffer>::Controller _recvHalfCtrl; }; BidirectionalPipe() { SingleProducerSingleConsumerQueue<SharedBuffer>::Pipe pipe1; SingleProducerSingleConsumerQueue<SharedBuffer>::Pipe pipe2; - left = std::unique_ptr<End>(new End(std::move(pipe1.producer), std::move(pipe2.consumer))); - right = std::unique_ptr<End>(new End(std::move(pipe2.producer), std::move(pipe1.consumer))); + left = std::unique_ptr<End>(new End( + std::move(pipe1.producer), std::move(pipe2.consumer), std::move(pipe2.controller))); + right = std::unique_ptr<End>(new End( + std::move(pipe2.producer), std::move(pipe1.consumer), std::move(pipe1.controller))); } std::unique_ptr<End> left; |