summaryrefslogtreecommitdiff
path: root/cpp/src/client/BasicMessageChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/client/BasicMessageChannel.cpp')
-rw-r--r--cpp/src/client/BasicMessageChannel.cpp66
1 files changed, 8 insertions, 58 deletions
diff --git a/cpp/src/client/BasicMessageChannel.cpp b/cpp/src/client/BasicMessageChannel.cpp
index 26c3fe543c..9e3d184673 100644
--- a/cpp/src/client/BasicMessageChannel.cpp
+++ b/cpp/src/client/BasicMessageChannel.cpp
@@ -45,60 +45,10 @@ const std::string BASIC_RETURN("__basic_return__");
const std::string BASIC_REF("__basic_reference__");
}
-class BasicMessageChannel::WaitableDestination :
- public IncomingMessage::Destination
-{
- public:
- WaitableDestination() : shutdownFlag(false) {}
- void message(const Message& msg) {
- Mutex::ScopedLock l(monitor);
- queue.push(msg);
- monitor.notify();
- }
-
- void empty() {
- Mutex::ScopedLock l(monitor);
- queue.push(Empty());
- monitor.notify();
- }
-
- bool wait(Message& msgOut) {
- Mutex::ScopedLock l(monitor);
- while (queue.empty() && !shutdownFlag)
- monitor.wait();
- if (shutdownFlag)
- return false;
- Message* msg = boost::get<Message>(&queue.front());
- bool success = msg;
- if (success)
- msgOut=*msg;
- queue.pop();
- if (!queue.empty())
- monitor.notify(); // Wake another waiter.
- return success;
- }
-
- void shutdown() {
- Mutex::ScopedLock l(monitor);
- shutdownFlag = true;
- monitor.notifyAll();
- }
-
- private:
- struct Empty {};
- typedef boost::variant<Message,Empty> Item;
- sys::Monitor monitor;
- std::queue<Item> queue;
- bool shutdownFlag;
-};
-
-
BasicMessageChannel::BasicMessageChannel(Channel& ch)
- : channel(ch), returnsHandler(0),
- destGet(new WaitableDestination()),
- destDispatch(new WaitableDestination())
+ : channel(ch), returnsHandler(0)
{
- incoming.addDestination(BASIC_RETURN, *destDispatch);
+ incoming.addDestination(BASIC_RETURN, destDispatch);
}
void BasicMessageChannel::consume(
@@ -162,8 +112,8 @@ void BasicMessageChannel::close(){
consumersCopy = consumers;
consumers.clear();
}
- destGet->shutdown();
- destDispatch->shutdown();
+ destGet.shutdown();
+ destDispatch.shutdown();
for (ConsumerMap::iterator i=consumersCopy.begin();
i != consumersCopy.end(); ++i)
{
@@ -181,10 +131,10 @@ bool BasicMessageChannel::get(
Message& msg, const Queue& queue, AckMode ackMode)
{
// Prepare for incoming response
- incoming.addDestination(BASIC_GET, *destGet);
+ incoming.addDestination(BASIC_GET, destGet);
channel.send(
new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- bool got = destGet->wait(msg);
+ bool got = destGet.wait(msg);
return got;
}
@@ -273,7 +223,7 @@ void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
}
// FIXME aconway 2007-03-23: Integrate consumer & destination
// maps.
- incoming.addDestination(tag, *destDispatch);
+ incoming.addDestination(tag, destDispatch);
return;
}
}
@@ -342,7 +292,7 @@ void BasicMessageChannel::run() {
while(channel.isOpen()) {
try {
Message msg;
- bool gotMessge = destDispatch->wait(msg);
+ bool gotMessge = destDispatch.wait(msg);
if (gotMessge) {
if(msg.getDestination() == BASIC_RETURN) {
ReturnedMessageHandler* handler=0;