diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-11 11:25:27 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-11 11:25:27 +0000 |
commit | c922ccae07d060f891848e688f7f1e29dc07c552 (patch) | |
tree | 8363c1678c5efc59769c19c58188ccb9466d8aa4 /cpp/src/qpid/client/ClientChannel.cpp | |
parent | fbda2ac45519f7108fc48f483d76d1487c2b3544 (diff) | |
download | qpid-python-c922ccae07d060f891848e688f7f1e29dc07c552.tar.gz |
Moved old ClientChannel class from using basic to using message for publish & consume.
(Get and qos still use the basic class's defintions, that will be changed next)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574551 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index b77840f433..a014fd90c5 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -136,7 +136,7 @@ void Channel::rollback(){ } void Channel::consume( - Queue& queue, const std::string& tag, MessageListener* listener, + Queue& _queue, const std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { if (tag.empty()) { @@ -152,10 +152,11 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } + uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; ScopedSync s(session, synch); - session.basicConsume(0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable()); + session.messageSubscribe(0, _queue.getName(), tag, noLocal, + confirmMode, 0/*pre-acquire*/, + false, fields ? *fields : FieldTable()); } void Channel::cancel(const std::string& tag, bool synch) { @@ -169,7 +170,7 @@ void Channel::cancel(const std::string& tag, bool synch) { consumers.erase(i); } ScopedSync s(session, synch); - session.basicCancel(tag); + session.messageCancel(tag); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { @@ -184,14 +185,13 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { } } -void Channel::publish(const Message& msg, const Exchange& exchange, +void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory, bool immediate) { + bool mandatory, bool /*immediate TODO-restore immediate?*/) { - const string e = exchange.getName(); - string key = routingKey; - - session.basicPublish(0, e, key, mandatory, immediate, msg); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); + session.messageTransfer((destination=exchange.getName(), content=msg)); } void Channel::close() @@ -222,20 +222,27 @@ void Channel::join() { } } +void Channel::dispatch(FrameSet& content, const std::string& destination) +{ + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + i->second.listener->received(msg); + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); + } +} + void Channel::run() { try { while (true) { FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: if (content->isA<BasicDeliverBody>()) { - ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); - if (i != consumers.end()) { - Message msg; - msg.populate(*content); - i->second.listener->received(msg); - } else { - QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod()); - } + dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag()); + } else if (content->isA<MessageTransferBody>()) { + dispatch(*content, content->as<MessageTransferBody>()->getDestination()); } else if (content->isA<BasicGetOkBody>()) { gets.push(content); } else { |