summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp45
-rw-r--r--cpp/src/qpid/client/ClientChannel.h4
-rw-r--r--cpp/src/qpid/client/ClientMessage.h66
3 files changed, 53 insertions, 62 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index b77840f433..a014fd90c5 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -136,7 +136,7 @@ void Channel::rollback(){
}
void Channel::consume(
- Queue& queue, const std::string& tag, MessageListener* listener,
+ Queue& _queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
if (tag.empty()) {
@@ -152,10 +152,11 @@ void Channel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
+ uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
ScopedSync s(session, synch);
- session.basicConsume(0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable());
+ session.messageSubscribe(0, _queue.getName(), tag, noLocal,
+ confirmMode, 0/*pre-acquire*/,
+ false, fields ? *fields : FieldTable());
}
void Channel::cancel(const std::string& tag, bool synch) {
@@ -169,7 +170,7 @@ void Channel::cancel(const std::string& tag, bool synch) {
consumers.erase(i);
}
ScopedSync s(session, synch);
- session.basicCancel(tag);
+ session.messageCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
@@ -184,14 +185,13 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
}
}
-void Channel::publish(const Message& msg, const Exchange& exchange,
+void Channel::publish(Message& msg, const Exchange& exchange,
const std::string& routingKey,
- bool mandatory, bool immediate) {
+ bool mandatory, bool /*immediate TODO-restore immediate?*/) {
- const string e = exchange.getName();
- string key = routingKey;
-
- session.basicPublish(0, e, key, mandatory, immediate, msg);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
+ session.messageTransfer((destination=exchange.getName(), content=msg));
}
void Channel::close()
@@ -222,20 +222,27 @@ void Channel::join() {
}
}
+void Channel::dispatch(FrameSet& content, const std::string& destination)
+{
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
+ Message msg;
+ msg.populate(content);
+ i->second.listener->received(msg);
+ } else {
+ QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
+ }
+}
+
void Channel::run() {
try {
while (true) {
FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
- ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
- if (i != consumers.end()) {
- Message msg;
- msg.populate(*content);
- i->second.listener->received(msg);
- } else {
- QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
- }
+ dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag());
+ } else if (content->isA<MessageTransferBody>()) {
+ dispatch(*content, content->as<MessageTransferBody>()->getDestination());
} else if (content->isA<BasicGetOkBody>()) {
gets.push(content);
} else {
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index 7ba4b0a246..9e5e3a2e70 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -93,6 +93,8 @@ class Channel : private sys::Runnable
void closeInternal();
void join();
+ void dispatch(framing::FrameSet& msg, const std::string& destination);
+
// FIXME aconway 2007-02-23: Get rid of friendships.
friend class Connection;
@@ -301,7 +303,7 @@ class Channel : private sys::Runnable
* receive this message on publication, the message will be
* returned (see setReturnedMessageHandler()).
*/
- void publish(const Message& msg, const Exchange& exchange,
+ void publish(Message& msg, const Exchange& exchange,
const std::string& routingKey,
bool mandatory = false, bool immediate = false);
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h
index 1afe5585a9..5c4eb4e5aa 100644
--- a/cpp/src/qpid/client/ClientMessage.h
+++ b/cpp/src/qpid/client/ClientMessage.h
@@ -22,13 +22,7 @@
*
*/
#include <string>
-#include "qpid/framing/BasicHeaderProperties.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MethodContent.h"
-
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/TransferContent.h"
namespace qpid {
namespace client {
@@ -39,49 +33,37 @@ namespace client {
*
* \ingroup clientapi
*/
-// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
-// basic header properties.
-class Message : public framing::BasicHeaderProperties, public framing::MethodContent {
- public:
- Message(const std::string& data_=std::string()) : data(data_) {}
-
- const std::string& getData() const { return data; }
- void setData(const std::string& _data) { data = _data; }
-
- std::string getDestination() const { return destination; }
- void setDestination(const std::string& dest) { destination = dest; }
+class Message : public framing::TransferContent
+{
+public:
+ Message(const std::string& data_=std::string()) : TransferContent(data_) {}
- // TODO aconway 2007-03-22: only needed for Basic.deliver support.
- uint64_t getDeliveryTag() const { return deliveryTag; }
- void setDeliveryTag(uint64_t dt) { deliveryTag = dt; }
-
- bool isRedelivered() const { return redelivered; }
- void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+ std::string getDestination() const
+ {
+ return destination;
+ }
+
+ void setDestination(const std::string& dest)
+ {
+ destination = dest;
+ }
- framing::AMQHeaderBody getHeader() const
+ bool isRedelivered() const
{
- framing::AMQHeaderBody header;
- BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true);
- BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this);
- properties->setContentLength(data.size());
- return header;
+ return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
}
- //TODO: move this elsewhere (GRS 24/08/2007)
- void populate(framing::FrameSet& frameset)
- {
- const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>();
- if (properties) {
- BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties);
- }
- frameset.getContent(data);
+ void setRedelivered(bool redelivered) {
+ getDeliveryProperties().setRedelivered(redelivered);
+ }
+
+ framing::FieldTable& getHeaders()
+ {
+ return getMessageProperties().getApplicationHeaders();
}
- private:
- std::string data;
+private:
std::string destination;
- bool redelivered;
- uint64_t deliveryTag;
};
}}