diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 29 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 1 |
4 files changed, 23 insertions, 25 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 6cc7fcc587..a59e02a066 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -224,7 +224,6 @@ class ExchangeSink : public Exchange, public MessageSink void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: - const std::string defaultSubject; }; class QueueSink : public Queue, public MessageSink @@ -406,9 +405,8 @@ Subscription::Subscription(const Address& address, const std::string& exchangeTy const Variant& filter = address.getOption(FILTER); if (!filter.isVoid()) { - //TODO: if both subject _and_ filter are specified, - //combine in some way; for now we just ignore the - //subject in that case. + //TODO: if both subject _and_ filter are specified, combine in + //some way; for now we just ignore the subject in that case. bind(filter); } else if (address.hasSubject()) { //Note: This will not work for headers- or xml- exchange; @@ -459,9 +457,7 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o): exchange(e), key(k), options(o) {} -void convert(qpid::messaging::Message& from, qpid::client::Message& to); - -ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {} +ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {} void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&) { @@ -471,9 +467,7 @@ void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::strin void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { - if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { - m.message.getDeliveryProperties().setRoutingKey(defaultSubject); - } + m.message.getDeliveryProperties().setRoutingKey(m.getSubject()); m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); } @@ -500,21 +494,6 @@ void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) checkDelete(session, FOR_SENDER); } -void convert(qpid::messaging::Message& from, qpid::client::Message& to) -{ - //TODO: need to avoid copying as much as possible - to.setData(from.getContent()); - to.getDeliveryProperties().setRoutingKey(from.getSubject()); - //TODO: set other delivery properties - to.getMessageProperties().setContentType(from.getContentType()); - const Address& address = from.getReplyTo(); - if (!address.getName().empty()) { - to.getMessageProperties().setReplyTo(AddressResolution::convert(address)); - } - translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders()); - //TODO: set other message properties -} - Address AddressResolution::convert(const qpid::framing::ReplyTo& rt) { Address address; diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index d3410ad76e..abd4f4c28c 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -48,4 +48,20 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) //TODO: set other delivery properties } +namespace { +const std::string SUBJECT("subject"); +} + +void OutgoingMessage::setSubject(const std::string& subject) +{ + if (!subject.empty()) { + message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject); + } +} + +std::string OutgoingMessage::getSubject() const +{ + return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT); +} + }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h index 8801e4e769..0cdd2a2336 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h @@ -37,6 +37,8 @@ struct OutgoingMessage qpid::client::Completion status; void convert(const qpid::messaging::Message&); + void setSubject(const std::string& subject); + std::string getSubject() const; }; diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 24aaa054d2..4d6b9869e6 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -98,6 +98,7 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m) //TODO: make recording for replay optional (would still want to track completion however) std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); msg->convert(m); + msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); outgoing.push_back(msg.release()); sink->send(session, name, outgoing.back()); } |