diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/ClientMessage.h | 66 | ||||
-rw-r--r-- | cpp/src/qpid/framing/TransferContent.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/framing/TransferContent.h | 13 | ||||
-rw-r--r-- | cpp/src/tests/interop_runner.cpp | 36 |
6 files changed, 109 insertions, 86 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index b77840f433..a014fd90c5 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -136,7 +136,7 @@ void Channel::rollback(){ } void Channel::consume( - Queue& queue, const std::string& tag, MessageListener* listener, + Queue& _queue, const std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { if (tag.empty()) { @@ -152,10 +152,11 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } + uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; ScopedSync s(session, synch); - session.basicConsume(0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable()); + session.messageSubscribe(0, _queue.getName(), tag, noLocal, + confirmMode, 0/*pre-acquire*/, + false, fields ? *fields : FieldTable()); } void Channel::cancel(const std::string& tag, bool synch) { @@ -169,7 +170,7 @@ void Channel::cancel(const std::string& tag, bool synch) { consumers.erase(i); } ScopedSync s(session, synch); - session.basicCancel(tag); + session.messageCancel(tag); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { @@ -184,14 +185,13 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { } } -void Channel::publish(const Message& msg, const Exchange& exchange, +void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, - bool mandatory, bool immediate) { + bool mandatory, bool /*immediate TODO-restore immediate?*/) { - const string e = exchange.getName(); - string key = routingKey; - - session.basicPublish(0, e, key, mandatory, immediate, msg); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); + session.messageTransfer((destination=exchange.getName(), content=msg)); } void Channel::close() @@ -222,20 +222,27 @@ void Channel::join() { } } +void Channel::dispatch(FrameSet& content, const std::string& destination) +{ + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + i->second.listener->received(msg); + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); + } +} + void Channel::run() { try { while (true) { FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: if (content->isA<BasicDeliverBody>()) { - ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); - if (i != consumers.end()) { - Message msg; - msg.populate(*content); - i->second.listener->received(msg); - } else { - QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod()); - } + dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag()); + } else if (content->isA<MessageTransferBody>()) { + dispatch(*content, content->as<MessageTransferBody>()->getDestination()); } else if (content->isA<BasicGetOkBody>()) { gets.push(content); } else { diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index 7ba4b0a246..9e5e3a2e70 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -93,6 +93,8 @@ class Channel : private sys::Runnable void closeInternal(); void join(); + void dispatch(framing::FrameSet& msg, const std::string& destination); + // FIXME aconway 2007-02-23: Get rid of friendships. friend class Connection; @@ -301,7 +303,7 @@ class Channel : private sys::Runnable * receive this message on publication, the message will be * returned (see setReturnedMessageHandler()). */ - void publish(const Message& msg, const Exchange& exchange, + void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory = false, bool immediate = false); diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h index 1afe5585a9..5c4eb4e5aa 100644 --- a/cpp/src/qpid/client/ClientMessage.h +++ b/cpp/src/qpid/client/ClientMessage.h @@ -22,13 +22,7 @@ * */ #include <string> -#include "qpid/framing/BasicHeaderProperties.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/MethodContent.h" - -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/BasicGetOkBody.h" -#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/TransferContent.h" namespace qpid { namespace client { @@ -39,49 +33,37 @@ namespace client { * * \ingroup clientapi */ -// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not -// basic header properties. -class Message : public framing::BasicHeaderProperties, public framing::MethodContent { - public: - Message(const std::string& data_=std::string()) : data(data_) {} - - const std::string& getData() const { return data; } - void setData(const std::string& _data) { data = _data; } - - std::string getDestination() const { return destination; } - void setDestination(const std::string& dest) { destination = dest; } +class Message : public framing::TransferContent +{ +public: + Message(const std::string& data_=std::string()) : TransferContent(data_) {} - // TODO aconway 2007-03-22: only needed for Basic.deliver support. - uint64_t getDeliveryTag() const { return deliveryTag; } - void setDeliveryTag(uint64_t dt) { deliveryTag = dt; } - - bool isRedelivered() const { return redelivered; } - void setRedelivered(bool _redelivered){ redelivered = _redelivered; } + std::string getDestination() const + { + return destination; + } + + void setDestination(const std::string& dest) + { + destination = dest; + } - framing::AMQHeaderBody getHeader() const + bool isRedelivered() const { - framing::AMQHeaderBody header; - BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true); - BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this); - properties->setContentLength(data.size()); - return header; + return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); } - //TODO: move this elsewhere (GRS 24/08/2007) - void populate(framing::FrameSet& frameset) - { - const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>(); - if (properties) { - BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties); - } - frameset.getContent(data); + void setRedelivered(bool redelivered) { + getDeliveryProperties().setRedelivered(redelivered); + } + + framing::FieldTable& getHeaders() + { + return getMessageProperties().getApplicationHeaders(); } - private: - std::string data; +private: std::string destination; - bool redelivered; - uint64_t deliveryTag; }; }} diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index 1faa24ae0c..4c2d06ae42 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -61,4 +61,35 @@ DeliveryProperties& TransferContent::getDeliveryProperties() return *header.get<DeliveryProperties>(true); } +void TransferContent::populate(const FrameSet& frameset) +{ + header = *frameset.getHeaders(); + frameset.getContent(data); +} + +const MessageProperties& TransferContent::getMessageProperties() const +{ + const MessageProperties* props = header.get<MessageProperties>(); + if (!props) throw NoSuchPropertiesException(); + return *props; +} + +const DeliveryProperties& TransferContent::getDeliveryProperties() const +{ + const DeliveryProperties* props = header.get<DeliveryProperties>(); + if (!props) throw NoSuchPropertiesException(); + return *props; +} + +bool TransferContent::hasMessageProperties() const +{ + return header.get<MessageProperties>(); +} + +bool TransferContent::hasDeliveryProperties() const +{ + return header.get<DeliveryProperties>(); +} + + }} diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index 14ba209f4b..c5fc395c94 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -21,13 +21,17 @@ #ifndef _TransferContent_ #define _TransferContent_ +#include "FrameSet.h" #include "MethodContent.h" +#include "qpid/Exception.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/DeliveryProperties.h" namespace qpid { namespace framing { +struct NoSuchPropertiesException : public Exception {}; + class TransferContent : public MethodContent { AMQHeaderBody header; @@ -37,9 +41,16 @@ public: AMQHeaderBody getHeader() const; void setData(const std::string&); void appendData(const std::string&); - const std::string& getData() const; MessageProperties& getMessageProperties(); DeliveryProperties& getDeliveryProperties(); + + const std::string& getData() const; + const MessageProperties& getMessageProperties() const; + const DeliveryProperties& getDeliveryProperties() const; + bool hasMessageProperties() const; + bool hasDeliveryProperties() const; + + void populate(const FrameSet& frameset); }; }} diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp index 7c47edafa0..1b87512857 100644 --- a/cpp/src/tests/interop_runner.cpp +++ b/cpp/src/tests/interop_runner.cpp @@ -47,6 +47,7 @@ using namespace qpid::sys; using qpid::TestCase; using qpid::TestOptions; using qpid::framing::FieldTable; +using qpid::framing::ReplyTo; using namespace std; class DummyRun : public TestCase @@ -73,14 +74,14 @@ class Listener : public MessageListener, private Runnable{ const string topic; TestMap::iterator test; auto_ptr<Thread> runner; - string reportTo; + ReplyTo reportTo; string reportCorrelator; void shutdown(); bool invite(const string& name); void run(); - void sendResponse(Message& response, string replyTo); + void sendResponse(Message& response, ReplyTo replyTo); void sendResponse(Message& response, Message& request); void sendSimpleResponse(const string& type, Message& request); void sendReport(); @@ -146,30 +147,19 @@ void Listener::sendSimpleResponse(const string& type, Message& request) response.getHeaders().setString("CONTROL_TYPE", type); response.getHeaders().setString("CLIENT_NAME", name); response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic); - response.setCorrelationId(request.getCorrelationId()); + response.getMessageProperties().setCorrelationId(request.getMessageProperties().getCorrelationId()); sendResponse(response, request); } void Listener::sendResponse(Message& response, Message& request) { - sendResponse(response, request.getReplyTo()); + sendResponse(response, request.getMessageProperties().getReplyTo()); } -void Listener::sendResponse(Message& response, string replyTo) +void Listener::sendResponse(Message& response, ReplyTo replyTo) { - //Exchange and routing key need to be extracted from the reply-to - //field. Format is assumed to be: - // - // <exchange type>://<exchange name>/<routing key>?<options> - // - //and all we need is the exchange name and routing key - // - if (replyTo.empty()) throw qpid::Exception("Reply address not set!"); - const string delims(":/?="); - - string::size_type start = replyTo.find(':');//skip exchange type - string exchange = parse_next_word(replyTo, delims, start); - string routingKey = parse_next_word(replyTo, delims, start); + string exchange = replyTo.getExchangeName(); + string routingKey = replyTo.getRoutingKey(); channel.publish(response, exchange, routingKey); } @@ -188,12 +178,12 @@ void Listener::received(Message& message) test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options); sendSimpleResponse("ACCEPT_ROLE", message); } else if (type == "START") { - reportTo = message.getReplyTo(); - reportCorrelator = message.getCorrelationId(); + reportTo = message.getMessageProperties().getReplyTo(); + reportCorrelator = message.getMessageProperties().getCorrelationId(); runner = auto_ptr<Thread>(new Thread(this)); } else if (type == "STATUS_REQUEST") { - reportTo = message.getReplyTo(); - reportCorrelator = message.getCorrelationId(); + reportTo = message.getMessageProperties().getReplyTo(); + reportCorrelator = message.getMessageProperties().getCorrelationId(); test->stop(); sendReport(); } else if (type == "TERMINATE") { @@ -229,7 +219,7 @@ void Listener::sendReport() Message report; report.getHeaders().setString("CONTROL_TYPE", "REPORT"); test->report(report); - report.setCorrelationId(reportCorrelator); + report.getMessageProperties().setCorrelationId(reportCorrelator); sendResponse(report, reportTo); } |