diff options
author | Gordon Sim <gsim@apache.org> | 2012-11-15 14:42:22 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2012-11-15 14:42:22 +0000 |
commit | abe66b3f8e18fc8b6f016b922db24100ff834734 (patch) | |
tree | 2a10278bfdf15c18214d1642e1dd24eebd2a0a4f /cpp/src/qpid/messaging/amqp/SenderContext.cpp | |
parent | 2bce3b0dd247227bc5ff1dd3f17f6903c66e9cce (diff) | |
download | qpid-python-abe66b3f8e18fc8b6f016b922db24100ff834734.tar.gz |
QPID-4368: Add support for subject, translated to a filter (i.e. at present a binding key) by receivers and used as default value for senders
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1409813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/SenderContext.cpp')
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SenderContext.cpp | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 1306a314be..02f7bdec1c 100644 --- a/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -36,10 +36,10 @@ namespace qpid { namespace messaging { namespace amqp { //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t) +SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) : name(n), - target(t), - sender(pn_sender(session, target.c_str())), capacity(1000) {} + address(a), + sender(pn_sender(session, n.c_str())), capacity(1000) {} SenderContext::~SenderContext() { @@ -74,7 +74,7 @@ const std::string& SenderContext::getName() const const std::string& SenderContext::getTarget() const { - return target; + return address.getName(); } SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) @@ -82,7 +82,7 @@ SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& mes if (processUnsettled() < capacity) { deliveries.push_back(Delivery(nextId++)); Delivery& delivery = deliveries.back(); - delivery.encode(MessageImplAccess::get(message)); + delivery.encode(MessageImplAccess::get(message), address); delivery.send(sender); return &delivery; } else { @@ -135,7 +135,7 @@ const std::string EMPTY; class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties { public: - PropertiesAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {} + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {} bool hasMessageId() const { return getMessageId().size(); @@ -167,12 +167,12 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties bool hasSubject() const { - return getSubject().size(); + return subject.size() || getSubject().size(); } std::string getSubject() const { - return msg.getSubject(); + return subject.size() ? subject : msg.getSubject(); } bool hasReplyTo() const @@ -266,16 +266,23 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties } private: const qpid::messaging::MessageImpl& msg; + const std::string subject; }; + +bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + return address.getSubject().size() && address.getSubject() != msg.getSubject(); +} + } SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {} -void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg) +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) { boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); - if (original) { //still have the content as received, send at least the bare message unaltered + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? if (original->hasHeaderChanged(msg)) { //since as yet have no annotations, just write the revised header then the rest of the message as received @@ -293,7 +300,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg) } } else { HeaderAdapter header(msg); - PropertiesAdapter properties(msg); + PropertiesAdapter properties(msg, address.getSubject()); //compute size: encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes())); QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") |