summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-04-03 21:18:17 +0000
committerAlan Conway <aconway@apache.org>2007-04-03 21:18:17 +0000
commit0184b49a6c2f0e3174b8b5ac5e33342123737a14 (patch)
tree4820f184c930705d941eeadce20e183e65642bad /cpp/src
parent80522c12572f20aca4b68223a5f263721cb99a16 (diff)
downloadqpid-python-0184b49a6c2f0e3174b8b5ac5e33342123737a14.tar.gz
Moved BasicMessage::WaitableDestination to IncomingMessage::WaitableDestination so it can be shared by Basic and Message implementations.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525282 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/client/BasicMessageChannel.cpp66
-rw-r--r--cpp/src/client/BasicMessageChannel.h5
-rw-r--r--cpp/src/client/IncomingMessage.cpp38
-rw-r--r--cpp/src/client/IncomingMessage.h26
4 files changed, 72 insertions, 63 deletions
diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp
index 26c3fe543c..9e3d184673 100644
--- a/cpp/src/client/BasicMessageChannel.cpp
+++ b/cpp/src/client/BasicMessageChannel.cpp
@@ -45,60 +45,10 @@ const std::string BASIC_RETURN("__basic_return__");
const std::string BASIC_REF("__basic_reference__");
}
-class BasicMessageChannel::WaitableDestination :
- public IncomingMessage::Destination
-{
- public:
- WaitableDestination() : shutdownFlag(false) {}
- void message(const Message& msg) {
- Mutex::ScopedLock l(monitor);
- queue.push(msg);
- monitor.notify();
- }
-
- void empty() {
- Mutex::ScopedLock l(monitor);
- queue.push(Empty());
- monitor.notify();
- }
-
- bool wait(Message& msgOut) {
- Mutex::ScopedLock l(monitor);
- while (queue.empty() && !shutdownFlag)
- monitor.wait();
- if (shutdownFlag)
- return false;
- Message* msg = boost::get<Message>(&queue.front());
- bool success = msg;
- if (success)
- msgOut=*msg;
- queue.pop();
- if (!queue.empty())
- monitor.notify(); // Wake another waiter.
- return success;
- }
-
- void shutdown() {
- Mutex::ScopedLock l(monitor);
- shutdownFlag = true;
- monitor.notifyAll();
- }
-
- private:
- struct Empty {};
- typedef boost::variant<Message,Empty> Item;
- sys::Monitor monitor;
- std::queue<Item> queue;
- bool shutdownFlag;
-};
-
-
BasicMessageChannel::BasicMessageChannel(Channel& ch)
- : channel(ch), returnsHandler(0),
- destGet(new WaitableDestination()),
- destDispatch(new WaitableDestination())
+ : channel(ch), returnsHandler(0)
{
- incoming.addDestination(BASIC_RETURN, *destDispatch);
+ incoming.addDestination(BASIC_RETURN, destDispatch);
}
void BasicMessageChannel::consume(
@@ -162,8 +112,8 @@ void BasicMessageChannel::close(){
consumersCopy = consumers;
consumers.clear();
}
- destGet->shutdown();
- destDispatch->shutdown();
+ destGet.shutdown();
+ destDispatch.shutdown();
for (ConsumerMap::iterator i=consumersCopy.begin();
i != consumersCopy.end(); ++i)
{
@@ -181,10 +131,10 @@ bool BasicMessageChannel::get(
Message& msg, const Queue& queue, AckMode ackMode)
{
// Prepare for incoming response
- incoming.addDestination(BASIC_GET, *destGet);
+ incoming.addDestination(BASIC_GET, destGet);
channel.send(
new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- bool got = destGet->wait(msg);
+ bool got = destGet.wait(msg);
return got;
}
@@ -273,7 +223,7 @@ void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
}
// FIXME aconway 2007-03-23: Integrate consumer & destination
// maps.
- incoming.addDestination(tag, *destDispatch);
+ incoming.addDestination(tag, destDispatch);
return;
}
}
@@ -342,7 +292,7 @@ void BasicMessageChannel::run() {
while(channel.isOpen()) {
try {
Message msg;
- bool gotMessge = destDispatch->wait(msg);
+ bool gotMessge = destDispatch.wait(msg);
if (gotMessge) {
if(msg.getDestination() == BASIC_RETURN) {
ReturnedMessageHandler* handler=0;
diff --git a/cpp/src/client/BasicMessageChannel.h b/cpp/src/client/BasicMessageChannel.h
index aaedfd6bf1..13e1cf1e00 100644
--- a/cpp/src/client/BasicMessageChannel.h
+++ b/cpp/src/client/BasicMessageChannel.h
@@ -63,7 +63,6 @@ class BasicMessageChannel : public MessageChannel
private:
- class WaitableDestination;
struct Consumer{
MessageListener* listener;
AckMode ackMode;
@@ -80,8 +79,8 @@ class BasicMessageChannel : public MessageChannel
uint64_t incoming_size;
ConsumerMap consumers ;
ReturnedMessageHandler* returnsHandler;
- boost::scoped_ptr<WaitableDestination> destGet;
- boost::scoped_ptr<WaitableDestination> destDispatch;
+ IncomingMessage::WaitableDestination destGet;
+ IncomingMessage::WaitableDestination destDispatch;
};
}} // namespace qpid::client
diff --git a/cpp/src/client/IncomingMessage.cpp b/cpp/src/client/IncomingMessage.cpp
index 05c4bc2378..eb5f2b6fae 100644
--- a/cpp/src/client/IncomingMessage.cpp
+++ b/cpp/src/client/IncomingMessage.cpp
@@ -32,6 +32,44 @@ using sys::Mutex;
IncomingMessage::Destination::~Destination() {}
+
+IncomingMessage::WaitableDestination::WaitableDestination()
+ : shutdownFlag(false) {}
+
+void IncomingMessage::WaitableDestination::message(const Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ queue.push(msg);
+ monitor.notify();
+}
+
+void IncomingMessage::WaitableDestination::empty() {
+ Mutex::ScopedLock l(monitor);
+ queue.push(Empty());
+ monitor.notify();
+}
+
+bool IncomingMessage::WaitableDestination::wait(Message& msgOut) {
+ Mutex::ScopedLock l(monitor);
+ while (queue.empty() && !shutdownFlag)
+ monitor.wait();
+ if (shutdownFlag)
+ return false;
+ Message* msg = boost::get<Message>(&queue.front());
+ bool success = msg;
+ if (success)
+ msgOut=*msg;
+ queue.pop();
+ if (!queue.empty())
+ monitor.notify(); // Wake another waiter.
+ return success;
+}
+
+void IncomingMessage::WaitableDestination::shutdown() {
+ Mutex::ScopedLock l(monitor);
+ shutdownFlag = true;
+ monitor.notifyAll();
+}
+
void IncomingMessage::openReference(const std::string& name) {
Mutex::ScopedLock l(lock);
if (references.find(name) != references.end())
diff --git a/cpp/src/client/IncomingMessage.h b/cpp/src/client/IncomingMessage.h
index b01bd3eedc..bc650dfbe1 100644
--- a/cpp/src/client/IncomingMessage.h
+++ b/cpp/src/client/IncomingMessage.h
@@ -21,10 +21,11 @@
* under the License.
*
*/
-#include "../sys/Mutex.h"
+#include "../sys/Monitor.h"
#include <map>
+#include <queue>
#include <vector>
-
+#include <boost/variant.hpp>
namespace qpid {
namespace client {
@@ -61,6 +62,27 @@ class IncomingMessage {
};
+ /** A destination that a thread can wait on till a message arrives. */
+ class WaitableDestination : public Destination
+ {
+ public:
+ WaitableDestination();
+ void message(const Message& msg);
+ void empty();
+ /** Wait till message() or empty() is called. True for message() */
+ bool wait(Message& msgOut);
+ void shutdown();
+
+ private:
+ struct Empty {};
+ typedef boost::variant<Message,Empty> Item;
+ sys::Monitor monitor;
+ std::queue<Item> queue;
+ bool shutdownFlag;
+ };
+
+
+
/** Add a reference. Throws if already open. */
void openReference(const std::string& name);