From c922ccae07d060f891848e688f7f1e29dc07c552 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 11 Sep 2007 11:25:27 +0000 Subject: Moved old ClientChannel class from using basic to using message for publish & consume. (Get and qos still use the basic class's defintions, that will be changed next) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574551 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ClientChannel.cpp | 45 ++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 19 deletions(-) (limited to 'cpp/src/qpid/client/ClientChannel.cpp') diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index b77840f433..a014fd90c5 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -136,7 +136,7 @@ void Channel::rollback(){ } void Channel::consume( - Queue& queue, const std::string& tag, MessageListener* listener, + Queue& _queue, const std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { if (tag.empty()) { @@ -152,10 +152,11 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } + uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; ScopedSync s(session, synch); - session.basicConsume(0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable()); + session.messageSubscribe(0, _queue.getName(), tag, noLocal, + confirmMode, 0/*pre-acquire*/, + false, fields ? *fields : FieldTable()); } void Channel::cancel(const std::string& tag, bool synch) { @@ -169,7 +170,7 @@ void Channel::cancel(const std::string& tag, bool synch) { consumers.erase(i); } ScopedSync s(session, synch); - session.basicCancel(tag); + session.messageCancel(tag); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { @@ -184,14 +185,13 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { } } -void Channel::publish(const Message& msg, const Exchange& exchange, +void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory, bool immediate) { + bool mandatory, bool /*immediate TODO-restore immediate?*/) { - const string e = exchange.getName(); - string key = routingKey; - - session.basicPublish(0, e, key, mandatory, immediate, msg); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); + session.messageTransfer((destination=exchange.getName(), content=msg)); } void Channel::close() @@ -222,20 +222,27 @@ void Channel::join() { } } +void Channel::dispatch(FrameSet& content, const std::string& destination) +{ + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + i->second.listener->received(msg); + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); + } +} + void Channel::run() { try { while (true) { FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: if (content->isA()) { - ConsumerMap::iterator i = consumers.find(content->as()->getConsumerTag()); - if (i != consumers.end()) { - Message msg; - msg.populate(*content); - i->second.listener->received(msg); - } else { - QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod()); - } + dispatch(*content, content->as()->getConsumerTag()); + } else if (content->isA()) { + dispatch(*content, content->as()->getDestination()); } else if (content->isA()) { gets.push(content); } else { -- cgit v1.2.1