summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-02-06 14:20:42 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-02-06 14:20:42 +0000
commitdd05cfcd8322cf786e78ed8dbdf4e723f25e9751 (patch)
tree65cccebf041e53daca579f1190ed9f67843403f5
parent195c5077ae4040e97b5e77f4941b13fd551f0896 (diff)
downloadqpid-python-dd05cfcd8322cf786e78ed8dbdf4e723f25e9751.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504151 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 577c053413..86c5024deb 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -424,6 +424,10 @@ BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId,
if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
else throw ChannelException(403, "Access would violate previously granted exclusivity");
}
+
+ connection.getChannel(channel).cancel(destination);
+
+ connection.client->getMessageHandler()->ok(channel);
}
void
@@ -456,6 +460,25 @@ BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
u_int64_t /*value*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
+ Channel& channel = connection.getChannel(channelId);
+ if(!destination.empty() && channel.exists(destination)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = destination;
+ channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+
+ connection.client->getMessageHandler()->ok(channelId);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
}
void