summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp29
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp16
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp1
4 files changed, 23 insertions, 25 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 6cc7fcc587..a59e02a066 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -224,7 +224,6 @@ class ExchangeSink : public Exchange, public MessageSink
void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message);
void cancel(qpid::client::AsyncSession& session, const std::string& name);
private:
- const std::string defaultSubject;
};
class QueueSink : public Queue, public MessageSink
@@ -406,9 +405,8 @@ Subscription::Subscription(const Address& address, const std::string& exchangeTy
const Variant& filter = address.getOption(FILTER);
if (!filter.isVoid()) {
- //TODO: if both subject _and_ filter are specified,
- //combine in some way; for now we just ignore the
- //subject in that case.
+ //TODO: if both subject _and_ filter are specified, combine in
+ //some way; for now we just ignore the subject in that case.
bind(filter);
} else if (address.hasSubject()) {
//Note: This will not work for headers- or xml- exchange;
@@ -459,9 +457,7 @@ void Subscription::cancel(qpid::client::AsyncSession& session, const std::string
Subscription::Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
exchange(e), key(k), options(o) {}
-void convert(qpid::messaging::Message& from, qpid::client::Message& to);
-
-ExchangeSink::ExchangeSink(const Address& address) : Exchange(address), defaultSubject(address.getSubject()) {}
+ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
@@ -471,9 +467,7 @@ void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::strin
void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
- if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) {
- m.message.getDeliveryProperties().setRoutingKey(defaultSubject);
- }
+ m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
}
@@ -500,21 +494,6 @@ void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
checkDelete(session, FOR_SENDER);
}
-void convert(qpid::messaging::Message& from, qpid::client::Message& to)
-{
- //TODO: need to avoid copying as much as possible
- to.setData(from.getContent());
- to.getDeliveryProperties().setRoutingKey(from.getSubject());
- //TODO: set other delivery properties
- to.getMessageProperties().setContentType(from.getContentType());
- const Address& address = from.getReplyTo();
- if (!address.getName().empty()) {
- to.getMessageProperties().setReplyTo(AddressResolution::convert(address));
- }
- translate(from.getHeaders(), to.getMessageProperties().getApplicationHeaders());
- //TODO: set other message properties
-}
-
Address AddressResolution::convert(const qpid::framing::ReplyTo& rt)
{
Address address;
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index d3410ad76e..abd4f4c28c 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -48,4 +48,20 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
//TODO: set other delivery properties
}
+namespace {
+const std::string SUBJECT("subject");
+}
+
+void OutgoingMessage::setSubject(const std::string& subject)
+{
+ if (!subject.empty()) {
+ message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject);
+ }
+}
+
+std::string OutgoingMessage::getSubject() const
+{
+ return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT);
+}
+
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
index 8801e4e769..0cdd2a2336 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
@@ -37,6 +37,8 @@ struct OutgoingMessage
qpid::client::Completion status;
void convert(const qpid::messaging::Message&);
+ void setSubject(const std::string& subject);
+ std::string getSubject() const;
};
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 24aaa054d2..4d6b9869e6 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -98,6 +98,7 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m)
//TODO: make recording for replay optional (would still want to track completion however)
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
msg->convert(m);
+ msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
outgoing.push_back(msg.release());
sink->send(session, name, outgoing.back());
}