diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 20f32a1b37..abd96a61e9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/amqp/DataReader.h" #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/amqp/Exception.h" #include "qpid/broker/amqp/Header.h" @@ -108,6 +109,19 @@ void OutgoingFromQueue::write(const char* data, size_t size) pn_link_send(link, data, size); } +void OutgoingFromQueue::mergeMessageAnnotationsIfRequired(const Record &r) +{ + pn_data_t *remoteAnnotationsRaw = + pn_disposition_annotations(pn_delivery_remote(r.delivery)); + if (remoteAnnotationsRaw == 0) { + return; + } + + qpid::types::Variant::Map remoteMessageAnnotations; + DataReader::read(remoteAnnotationsRaw, remoteMessageAnnotations); + queue->mergeMessageAnnotations(r.cursor, remoteMessageAnnotations); +} + void OutgoingFromQueue::handle(pn_delivery_t* delivery) { size_t i = Record::getIndex(pn_delivery_tag(delivery)); @@ -141,7 +155,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) break; case PN_MODIFIED: if (preAcquires()) { - //TODO: handle message-annotations + mergeMessageAnnotationsIfRequired(r); if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) { if (!trackingUndeliverableMessages) { // observe queue for changes to track undeliverable messages |