diff options
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 64 |
1 files changed, 38 insertions, 26 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index a014fd90c5..87062e1470 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -26,8 +26,10 @@ #include "ClientMessage.h" #include "qpid/QpidError.h" #include "Connection.h" +#include "Demux.h" #include "FutureResponse.h" #include "MessageListener.h" +#include "MessageQueue.h" #include <boost/format.hpp> #include <boost/bind.hpp> #include "qpid/framing/all_method_bodies.h" @@ -72,6 +74,9 @@ void Channel::open(const Session& s) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); active = true; session = s; + if(isTransactional()) { + session.txSelect(); + } } bool Channel::isOpen() const { @@ -79,17 +84,8 @@ bool Channel::isOpen() const { return active; } -void Channel::setQos() { - session.basicQos((prefetchCount=getPrefetch(), global=false)); - if(isTransactional()) { - //I think this is wrong! should only send TxSelect once... - session.txSelect(); - } -} - -void Channel::setPrefetch(uint16_t _prefetch){ +void Channel::setPrefetch(uint32_t _prefetch){ prefetch = _prefetch; - setQos(); } void Channel::declareExchange(Exchange& _exchange, bool synch){ @@ -157,6 +153,9 @@ void Channel::consume( session.messageSubscribe(0, _queue.getName(), tag, noLocal, confirmMode, 0/*pre-acquire*/, false, fields ? *fields : FieldTable()); + //allocate some credit: + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); } void Channel::cancel(const std::string& tag, bool synch) { @@ -173,21 +172,29 @@ void Channel::cancel(const std::string& tag, bool synch) { session.messageCancel(tag); } -bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK); - session.execution().sendFlushRequest(); - if (response.isA<BasicGetEmptyBody>()) { +bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { + string tag = "get-handler"; + ScopedDivert handler(tag, session.execution().getDemux()); + Demux::Queue& incoming = handler.getQueue(); + + session.messageSubscribe((destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1))); + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, 1); + Completion status = session.messageFlush(tag); + status.sync(); + session.messageCancel(tag); + + if (incoming.empty()) { return false; } else { - FrameSet::shared_ptr content = gets.pop(); - msg.populate(*content); + msg.populate(*(incoming.pop())); return true; } } void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory, bool /*immediate TODO-restore immediate?*/) { + bool mandatory, bool /*?TODO-restore immediate?*/) { msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); @@ -224,14 +231,23 @@ void Channel::join() { void Channel::dispatch(FrameSet& content, const std::string& destination) { - ConsumerMap::iterator i = consumers.find(destination); - if (i != consumers.end()) { + MessageListener* listener(0); + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + listener = i->second.listener; + } + } + if (listener) { Message msg; msg.populate(content); - i->second.listener->received(msg); + listener->received(msg); } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); - } + } } void Channel::run() { @@ -239,12 +255,8 @@ void Channel::run() { while (true) { FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: - if (content->isA<BasicDeliverBody>()) { - dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag()); - } else if (content->isA<MessageTransferBody>()) { + if (content->isA<MessageTransferBody>()) { dispatch(*content, content->as<MessageTransferBody>()->getDestination()); - } else if (content->isA<BasicGetOkBody>()) { - gets.push(content); } else { QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); } |