summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-11 11:25:27 +0000
committerGordon Sim <gsim@apache.org>2007-09-11 11:25:27 +0000
commitc922ccae07d060f891848e688f7f1e29dc07c552 (patch)
tree8363c1678c5efc59769c19c58188ccb9466d8aa4 /cpp/src/qpid/client/ClientChannel.cpp
parentfbda2ac45519f7108fc48f483d76d1487c2b3544 (diff)
downloadqpid-python-c922ccae07d060f891848e688f7f1e29dc07c552.tar.gz
Moved old ClientChannel class from using basic to using message for publish & consume.
(Get and qos still use the basic class's defintions, that will be changed next) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574551 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp45
1 files changed, 26 insertions, 19 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index b77840f433..a014fd90c5 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -136,7 +136,7 @@ void Channel::rollback(){
}
void Channel::consume(
- Queue& queue, const std::string& tag, MessageListener* listener,
+ Queue& _queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
if (tag.empty()) {
@@ -152,10 +152,11 @@ void Channel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
+ uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
ScopedSync s(session, synch);
- session.basicConsume(0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable());
+ session.messageSubscribe(0, _queue.getName(), tag, noLocal,
+ confirmMode, 0/*pre-acquire*/,
+ false, fields ? *fields : FieldTable());
}
void Channel::cancel(const std::string& tag, bool synch) {
@@ -169,7 +170,7 @@ void Channel::cancel(const std::string& tag, bool synch) {
consumers.erase(i);
}
ScopedSync s(session, synch);
- session.basicCancel(tag);
+ session.messageCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
@@ -184,14 +185,13 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
}
}
-void Channel::publish(const Message& msg, const Exchange& exchange,
+void Channel::publish(Message& msg, const Exchange& exchange,
const std::string& routingKey,
- bool mandatory, bool immediate) {
+ bool mandatory, bool /*immediate TODO-restore immediate?*/) {
- const string e = exchange.getName();
- string key = routingKey;
-
- session.basicPublish(0, e, key, mandatory, immediate, msg);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
+ session.messageTransfer((destination=exchange.getName(), content=msg));
}
void Channel::close()
@@ -222,20 +222,27 @@ void Channel::join() {
}
}
+void Channel::dispatch(FrameSet& content, const std::string& destination)
+{
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
+ Message msg;
+ msg.populate(content);
+ i->second.listener->received(msg);
+ } else {
+ QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
+ }
+}
+
void Channel::run() {
try {
while (true) {
FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
- ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
- if (i != consumers.end()) {
- Message msg;
- msg.populate(*content);
- i->second.listener->received(msg);
- } else {
- QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
- }
+ dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag());
+ } else if (content->isA<MessageTransferBody>()) {
+ dispatch(*content, content->as<MessageTransferBody>()->getDestination());
} else if (content->isA<BasicGetOkBody>()) {
gets.push(content);
} else {