diff options
Diffstat (limited to 'cpp/src/client/BasicMessageChannel.cpp')
-rw-r--r-- | cpp/src/client/BasicMessageChannel.cpp | 66 |
1 files changed, 8 insertions, 58 deletions
diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp index 26c3fe543c..9e3d184673 100644 --- a/cpp/src/client/BasicMessageChannel.cpp +++ b/cpp/src/client/BasicMessageChannel.cpp @@ -45,60 +45,10 @@ const std::string BASIC_RETURN("__basic_return__"); const std::string BASIC_REF("__basic_reference__"); } -class BasicMessageChannel::WaitableDestination : - public IncomingMessage::Destination -{ - public: - WaitableDestination() : shutdownFlag(false) {} - void message(const Message& msg) { - Mutex::ScopedLock l(monitor); - queue.push(msg); - monitor.notify(); - } - - void empty() { - Mutex::ScopedLock l(monitor); - queue.push(Empty()); - monitor.notify(); - } - - bool wait(Message& msgOut) { - Mutex::ScopedLock l(monitor); - while (queue.empty() && !shutdownFlag) - monitor.wait(); - if (shutdownFlag) - return false; - Message* msg = boost::get<Message>(&queue.front()); - bool success = msg; - if (success) - msgOut=*msg; - queue.pop(); - if (!queue.empty()) - monitor.notify(); // Wake another waiter. - return success; - } - - void shutdown() { - Mutex::ScopedLock l(monitor); - shutdownFlag = true; - monitor.notifyAll(); - } - - private: - struct Empty {}; - typedef boost::variant<Message,Empty> Item; - sys::Monitor monitor; - std::queue<Item> queue; - bool shutdownFlag; -}; - - BasicMessageChannel::BasicMessageChannel(Channel& ch) - : channel(ch), returnsHandler(0), - destGet(new WaitableDestination()), - destDispatch(new WaitableDestination()) + : channel(ch), returnsHandler(0) { - incoming.addDestination(BASIC_RETURN, *destDispatch); + incoming.addDestination(BASIC_RETURN, destDispatch); } void BasicMessageChannel::consume( @@ -162,8 +112,8 @@ void BasicMessageChannel::close(){ consumersCopy = consumers; consumers.clear(); } - destGet->shutdown(); - destDispatch->shutdown(); + destGet.shutdown(); + destDispatch.shutdown(); for (ConsumerMap::iterator i=consumersCopy.begin(); i != consumersCopy.end(); ++i) { @@ -181,10 +131,10 @@ bool BasicMessageChannel::get( Message& msg, const Queue& queue, AckMode ackMode) { // Prepare for incoming response - incoming.addDestination(BASIC_GET, *destGet); + incoming.addDestination(BASIC_GET, destGet); channel.send( new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); - bool got = destGet->wait(msg); + bool got = destGet.wait(msg); return got; } @@ -273,7 +223,7 @@ void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { } // FIXME aconway 2007-03-23: Integrate consumer & destination // maps. - incoming.addDestination(tag, *destDispatch); + incoming.addDestination(tag, destDispatch); return; } } @@ -342,7 +292,7 @@ void BasicMessageChannel::run() { while(channel.isOpen()) { try { Message msg; - bool gotMessge = destDispatch->wait(msg); + bool gotMessge = destDispatch.wait(msg); if (gotMessge) { if(msg.getDestination() == BASIC_RETURN) { ReturnedMessageHandler* handler=0; |