From ebb276cca41582b73223b55eff9f2d4386f4f746 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 28 Jun 2016 21:32:06 +0000 Subject: QPID-7329: Merge branch 'github/pr/10' into trunk Closes #10 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1750587 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 13 +++++++++++++ qpid/cpp/src/qpid/broker/Queue.h | 7 +++++++ qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 16 +++++++++++++++- qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 2 ++ 4 files changed, 37 insertions(+), 1 deletion(-) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 625c6cceba..858e8748c4 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -345,6 +345,19 @@ void Queue::process(Message& msg) } } +void Queue::mergeMessageAnnotations(const QueueCursor& position, + const qpid::types::Variant::Map& messageAnnotations) +{ + Mutex::ScopedLock locker(messageLock); + Message *message = messages->find(position); + if (!message) return; + + qpid::types::Variant::Map::const_iterator it; + for (it = messageAnnotations.begin(); it != messageAnnotations.end(); ++it) { + message->addAnnotation(it->first, it->second); + } +} + void Queue::release(const QueueCursor& position, bool markRedelivered) { QueueListeners::NotificationSet copy; diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index cf4c1a85bf..4b63a413b8 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -331,6 +331,13 @@ class Queue : public boost::enable_shared_from_this, private: QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0); public: + /** + * Merges message annotations for an in-memory message as a result of + * a modified disposition outcome + */ + QPID_BROKER_EXTERN void mergeMessageAnnotations(const QueueCursor& msg, + const qpid::types::Variant::Map& annotations); + /** * Returns a message to the in-memory queue (due to lack * of acknowledegement from a receiver). If a consumer is diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 20f32a1b37..abd96a61e9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/amqp/DataReader.h" #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/amqp/Exception.h" #include "qpid/broker/amqp/Header.h" @@ -108,6 +109,19 @@ void OutgoingFromQueue::write(const char* data, size_t size) pn_link_send(link, data, size); } +void OutgoingFromQueue::mergeMessageAnnotationsIfRequired(const Record &r) +{ + pn_data_t *remoteAnnotationsRaw = + pn_disposition_annotations(pn_delivery_remote(r.delivery)); + if (remoteAnnotationsRaw == 0) { + return; + } + + qpid::types::Variant::Map remoteMessageAnnotations; + DataReader::read(remoteAnnotationsRaw, remoteMessageAnnotations); + queue->mergeMessageAnnotations(r.cursor, remoteMessageAnnotations); +} + void OutgoingFromQueue::handle(pn_delivery_t* delivery) { size_t i = Record::getIndex(pn_delivery_tag(delivery)); @@ -141,7 +155,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) break; case PN_MODIFIED: if (preAcquires()) { - //TODO: handle message-annotations + mergeMessageAnnotationsIfRequired(r); if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) { if (!trackingUndeliverableMessages) { // observe queue for changes to track undeliverable messages diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index c56c8c0bf3..f4ca4691b3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -152,6 +152,8 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, static size_t getIndex(pn_delivery_tag_t); }; + void mergeMessageAnnotationsIfRequired(const Record &r); + const bool exclusive; const bool isControllingUser; boost::shared_ptr queue; -- cgit v1.2.1