diff options
author | Alan Conway <aconway@apache.org> | 2007-04-03 21:18:17 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-04-03 21:18:17 +0000 |
commit | 0184b49a6c2f0e3174b8b5ac5e33342123737a14 (patch) | |
tree | 4820f184c930705d941eeadce20e183e65642bad /cpp/src | |
parent | 80522c12572f20aca4b68223a5f263721cb99a16 (diff) | |
download | qpid-python-0184b49a6c2f0e3174b8b5ac5e33342123737a14.tar.gz |
Moved BasicMessage::WaitableDestination to IncomingMessage::WaitableDestination so it can be shared by Basic and Message implementations.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525282 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/client/BasicMessageChannel.cpp | 66 | ||||
-rw-r--r-- | cpp/src/client/BasicMessageChannel.h | 5 | ||||
-rw-r--r-- | cpp/src/client/IncomingMessage.cpp | 38 | ||||
-rw-r--r-- | cpp/src/client/IncomingMessage.h | 26 |
4 files changed, 72 insertions, 63 deletions
diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp index 26c3fe543c..9e3d184673 100644 --- a/cpp/src/client/BasicMessageChannel.cpp +++ b/cpp/src/client/BasicMessageChannel.cpp @@ -45,60 +45,10 @@ const std::string BASIC_RETURN("__basic_return__"); const std::string BASIC_REF("__basic_reference__"); } -class BasicMessageChannel::WaitableDestination : - public IncomingMessage::Destination -{ - public: - WaitableDestination() : shutdownFlag(false) {} - void message(const Message& msg) { - Mutex::ScopedLock l(monitor); - queue.push(msg); - monitor.notify(); - } - - void empty() { - Mutex::ScopedLock l(monitor); - queue.push(Empty()); - monitor.notify(); - } - - bool wait(Message& msgOut) { - Mutex::ScopedLock l(monitor); - while (queue.empty() && !shutdownFlag) - monitor.wait(); - if (shutdownFlag) - return false; - Message* msg = boost::get<Message>(&queue.front()); - bool success = msg; - if (success) - msgOut=*msg; - queue.pop(); - if (!queue.empty()) - monitor.notify(); // Wake another waiter. - return success; - } - - void shutdown() { - Mutex::ScopedLock l(monitor); - shutdownFlag = true; - monitor.notifyAll(); - } - - private: - struct Empty {}; - typedef boost::variant<Message,Empty> Item; - sys::Monitor monitor; - std::queue<Item> queue; - bool shutdownFlag; -}; - - BasicMessageChannel::BasicMessageChannel(Channel& ch) - : channel(ch), returnsHandler(0), - destGet(new WaitableDestination()), - destDispatch(new WaitableDestination()) + : channel(ch), returnsHandler(0) { - incoming.addDestination(BASIC_RETURN, *destDispatch); + incoming.addDestination(BASIC_RETURN, destDispatch); } void BasicMessageChannel::consume( @@ -162,8 +112,8 @@ void BasicMessageChannel::close(){ consumersCopy = consumers; consumers.clear(); } - destGet->shutdown(); - destDispatch->shutdown(); + destGet.shutdown(); + destDispatch.shutdown(); for (ConsumerMap::iterator i=consumersCopy.begin(); i != consumersCopy.end(); ++i) { @@ -181,10 +131,10 @@ bool BasicMessageChannel::get( Message& msg, const Queue& queue, AckMode ackMode) { // Prepare for incoming response - incoming.addDestination(BASIC_GET, *destGet); + incoming.addDestination(BASIC_GET, destGet); channel.send( new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); - bool got = destGet->wait(msg); + bool got = destGet.wait(msg); return got; } @@ -273,7 +223,7 @@ void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { } // FIXME aconway 2007-03-23: Integrate consumer & destination // maps. - incoming.addDestination(tag, *destDispatch); + incoming.addDestination(tag, destDispatch); return; } } @@ -342,7 +292,7 @@ void BasicMessageChannel::run() { while(channel.isOpen()) { try { Message msg; - bool gotMessge = destDispatch->wait(msg); + bool gotMessge = destDispatch.wait(msg); if (gotMessge) { if(msg.getDestination() == BASIC_RETURN) { ReturnedMessageHandler* handler=0; diff --git a/cpp/src/client/BasicMessageChannel.h b/cpp/src/client/BasicMessageChannel.h index aaedfd6bf1..13e1cf1e00 100644 --- a/cpp/src/client/BasicMessageChannel.h +++ b/cpp/src/client/BasicMessageChannel.h @@ -63,7 +63,6 @@ class BasicMessageChannel : public MessageChannel private: - class WaitableDestination; struct Consumer{ MessageListener* listener; AckMode ackMode; @@ -80,8 +79,8 @@ class BasicMessageChannel : public MessageChannel uint64_t incoming_size; ConsumerMap consumers ; ReturnedMessageHandler* returnsHandler; - boost::scoped_ptr<WaitableDestination> destGet; - boost::scoped_ptr<WaitableDestination> destDispatch; + IncomingMessage::WaitableDestination destGet; + IncomingMessage::WaitableDestination destDispatch; }; }} // namespace qpid::client diff --git a/cpp/src/client/IncomingMessage.cpp b/cpp/src/client/IncomingMessage.cpp index 05c4bc2378..eb5f2b6fae 100644 --- a/cpp/src/client/IncomingMessage.cpp +++ b/cpp/src/client/IncomingMessage.cpp @@ -32,6 +32,44 @@ using sys::Mutex; IncomingMessage::Destination::~Destination() {} + +IncomingMessage::WaitableDestination::WaitableDestination() + : shutdownFlag(false) {} + +void IncomingMessage::WaitableDestination::message(const Message& msg) { + Mutex::ScopedLock l(monitor); + queue.push(msg); + monitor.notify(); +} + +void IncomingMessage::WaitableDestination::empty() { + Mutex::ScopedLock l(monitor); + queue.push(Empty()); + monitor.notify(); +} + +bool IncomingMessage::WaitableDestination::wait(Message& msgOut) { + Mutex::ScopedLock l(monitor); + while (queue.empty() && !shutdownFlag) + monitor.wait(); + if (shutdownFlag) + return false; + Message* msg = boost::get<Message>(&queue.front()); + bool success = msg; + if (success) + msgOut=*msg; + queue.pop(); + if (!queue.empty()) + monitor.notify(); // Wake another waiter. + return success; +} + +void IncomingMessage::WaitableDestination::shutdown() { + Mutex::ScopedLock l(monitor); + shutdownFlag = true; + monitor.notifyAll(); +} + void IncomingMessage::openReference(const std::string& name) { Mutex::ScopedLock l(lock); if (references.find(name) != references.end()) diff --git a/cpp/src/client/IncomingMessage.h b/cpp/src/client/IncomingMessage.h index b01bd3eedc..bc650dfbe1 100644 --- a/cpp/src/client/IncomingMessage.h +++ b/cpp/src/client/IncomingMessage.h @@ -21,10 +21,11 @@ * under the License. * */ -#include "../sys/Mutex.h" +#include "../sys/Monitor.h" #include <map> +#include <queue> #include <vector> - +#include <boost/variant.hpp> namespace qpid { namespace client { @@ -61,6 +62,27 @@ class IncomingMessage { }; + /** A destination that a thread can wait on till a message arrives. */ + class WaitableDestination : public Destination + { + public: + WaitableDestination(); + void message(const Message& msg); + void empty(); + /** Wait till message() or empty() is called. True for message() */ + bool wait(Message& msgOut); + void shutdown(); + + private: + struct Empty {}; + typedef boost::variant<Message,Empty> Item; + sys::Monitor monitor; + std::queue<Item> queue; + bool shutdownFlag; + }; + + + /** Add a reference. Throws if already open. */ void openReference(const std::string& name); |