summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp64
1 files changed, 38 insertions, 26 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index a014fd90c5..87062e1470 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -26,8 +26,10 @@
#include "ClientMessage.h"
#include "qpid/QpidError.h"
#include "Connection.h"
+#include "Demux.h"
#include "FutureResponse.h"
#include "MessageListener.h"
+#include "MessageQueue.h"
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include "qpid/framing/all_method_bodies.h"
@@ -72,6 +74,9 @@ void Channel::open(const Session& s)
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
active = true;
session = s;
+ if(isTransactional()) {
+ session.txSelect();
+ }
}
bool Channel::isOpen() const {
@@ -79,17 +84,8 @@ bool Channel::isOpen() const {
return active;
}
-void Channel::setQos() {
- session.basicQos((prefetchCount=getPrefetch(), global=false));
- if(isTransactional()) {
- //I think this is wrong! should only send TxSelect once...
- session.txSelect();
- }
-}
-
-void Channel::setPrefetch(uint16_t _prefetch){
+void Channel::setPrefetch(uint32_t _prefetch){
prefetch = _prefetch;
- setQos();
}
void Channel::declareExchange(Exchange& _exchange, bool synch){
@@ -157,6 +153,9 @@ void Channel::consume(
session.messageSubscribe(0, _queue.getName(), tag, noLocal,
confirmMode, 0/*pre-acquire*/,
false, fields ? *fields : FieldTable());
+ //allocate some credit:
+ session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
+ session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF);
}
void Channel::cancel(const std::string& tag, bool synch) {
@@ -173,21 +172,29 @@ void Channel::cancel(const std::string& tag, bool synch) {
session.messageCancel(tag);
}
-bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
- session.execution().sendFlushRequest();
- if (response.isA<BasicGetEmptyBody>()) {
+bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
+ string tag = "get-handler";
+ ScopedDivert handler(tag, session.execution().getDemux());
+ Demux::Queue& incoming = handler.getQueue();
+
+ session.messageSubscribe((destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)));
+ session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
+ session.messageFlow(tag, 0/*MESSAGES*/, 1);
+ Completion status = session.messageFlush(tag);
+ status.sync();
+ session.messageCancel(tag);
+
+ if (incoming.empty()) {
return false;
} else {
- FrameSet::shared_ptr content = gets.pop();
- msg.populate(*content);
+ msg.populate(*(incoming.pop()));
return true;
}
}
void Channel::publish(Message& msg, const Exchange& exchange,
const std::string& routingKey,
- bool mandatory, bool /*immediate TODO-restore immediate?*/) {
+ bool mandatory, bool /*?TODO-restore immediate?*/) {
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
@@ -224,14 +231,23 @@ void Channel::join() {
void Channel::dispatch(FrameSet& content, const std::string& destination)
{
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
+ MessageListener* listener(0);
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
+ Message msg;
+ msg.populate(content);
+ listener = i->second.listener;
+ }
+ }
+ if (listener) {
Message msg;
msg.populate(content);
- i->second.listener->received(msg);
+ listener->received(msg);
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
- }
+ }
}
void Channel::run() {
@@ -239,12 +255,8 @@ void Channel::run() {
while (true) {
FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
- if (content->isA<BasicDeliverBody>()) {
- dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag());
- } else if (content->isA<MessageTransferBody>()) {
+ if (content->isA<MessageTransferBody>()) {
dispatch(*content, content->as<MessageTransferBody>()->getDestination());
- } else if (content->isA<BasicGetOkBody>()) {
- gets.push(content);
} else {
QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
}