summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/messaging/amqp/SenderContext.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-11-15 14:42:22 +0000
committerGordon Sim <gsim@apache.org>2012-11-15 14:42:22 +0000
commitabe66b3f8e18fc8b6f016b922db24100ff834734 (patch)
tree2a10278bfdf15c18214d1642e1dd24eebd2a0a4f /cpp/src/qpid/messaging/amqp/SenderContext.cpp
parent2bce3b0dd247227bc5ff1dd3f17f6903c66e9cce (diff)
downloadqpid-python-abe66b3f8e18fc8b6f016b922db24100ff834734.tar.gz
QPID-4368: Add support for subject, translated to a filter (i.e. at present a binding key) by receivers and used as default value for senders
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1409813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/SenderContext.cpp')
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderContext.cpp29
1 files changed, 18 insertions, 11 deletions
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 1306a314be..02f7bdec1c 100644
--- a/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -36,10 +36,10 @@ namespace qpid {
namespace messaging {
namespace amqp {
//TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t)
+SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
: name(n),
- target(t),
- sender(pn_sender(session, target.c_str())), capacity(1000) {}
+ address(a),
+ sender(pn_sender(session, n.c_str())), capacity(1000) {}
SenderContext::~SenderContext()
{
@@ -74,7 +74,7 @@ const std::string& SenderContext::getName() const
const std::string& SenderContext::getTarget() const
{
- return target;
+ return address.getName();
}
SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
@@ -82,7 +82,7 @@ SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& mes
if (processUnsettled() < capacity) {
deliveries.push_back(Delivery(nextId++));
Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message));
+ delivery.encode(MessageImplAccess::get(message), address);
delivery.send(sender);
return &delivery;
} else {
@@ -135,7 +135,7 @@ const std::string EMPTY;
class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
{
public:
- PropertiesAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {}
+ PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {}
bool hasMessageId() const
{
return getMessageId().size();
@@ -167,12 +167,12 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
bool hasSubject() const
{
- return getSubject().size();
+ return subject.size() || getSubject().size();
}
std::string getSubject() const
{
- return msg.getSubject();
+ return subject.size() ? subject : msg.getSubject();
}
bool hasReplyTo() const
@@ -266,16 +266,23 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
}
private:
const qpid::messaging::MessageImpl& msg;
+ const std::string subject;
};
+
+bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
+{
+ return address.getSubject().size() && address.getSubject() != msg.getSubject();
+}
+
}
SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
-void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg)
+void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
- if (original) { //still have the content as received, send at least the bare message unaltered
+ if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
//do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
if (original->hasHeaderChanged(msg)) {
//since as yet have no annotations, just write the revised header then the rest of the message as received
@@ -293,7 +300,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg)
}
} else {
HeaderAdapter header(msg);
- PropertiesAdapter properties(msg);
+ PropertiesAdapter properties(msg, address.getSubject());
//compute size:
encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes()));
QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")