diff options
author | Gordon Sim <gsim@apache.org> | 2016-06-28 21:32:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2016-06-28 21:32:06 +0000 |
commit | ebb276cca41582b73223b55eff9f2d4386f4f746 (patch) | |
tree | 8aebe92f60aa035aa04861c7aa42af60c0481ec2 | |
parent | 13525713a1f57c4228982c79029f7a5486ced0e7 (diff) | |
download | qpid-python-ebb276cca41582b73223b55eff9f2d4386f4f746.tar.gz |
QPID-7329: Merge branch 'github/pr/10' into trunk
Closes #10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1750587 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 2 |
4 files changed, 37 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 625c6cceba..858e8748c4 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -345,6 +345,19 @@ void Queue::process(Message& msg) } } +void Queue::mergeMessageAnnotations(const QueueCursor& position, + const qpid::types::Variant::Map& messageAnnotations) +{ + Mutex::ScopedLock locker(messageLock); + Message *message = messages->find(position); + if (!message) return; + + qpid::types::Variant::Map::const_iterator it; + for (it = messageAnnotations.begin(); it != messageAnnotations.end(); ++it) { + message->addAnnotation(it->first, it->second); + } +} + void Queue::release(const QueueCursor& position, bool markRedelivered) { QueueListeners::NotificationSet copy; diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index cf4c1a85bf..4b63a413b8 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -332,6 +332,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0); public: /** + * Merges message annotations for an in-memory message as a result of + * a modified disposition outcome + */ + QPID_BROKER_EXTERN void mergeMessageAnnotations(const QueueCursor& msg, + const qpid::types::Variant::Map& annotations); + + /** * Returns a message to the in-memory queue (due to lack * of acknowledegement from a receiver). If a consumer is * available it will be dispatched immediately, else it diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 20f32a1b37..abd96a61e9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/amqp/DataReader.h" #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/amqp/Exception.h" #include "qpid/broker/amqp/Header.h" @@ -108,6 +109,19 @@ void OutgoingFromQueue::write(const char* data, size_t size) pn_link_send(link, data, size); } +void OutgoingFromQueue::mergeMessageAnnotationsIfRequired(const Record &r) +{ + pn_data_t *remoteAnnotationsRaw = + pn_disposition_annotations(pn_delivery_remote(r.delivery)); + if (remoteAnnotationsRaw == 0) { + return; + } + + qpid::types::Variant::Map remoteMessageAnnotations; + DataReader::read(remoteAnnotationsRaw, remoteMessageAnnotations); + queue->mergeMessageAnnotations(r.cursor, remoteMessageAnnotations); +} + void OutgoingFromQueue::handle(pn_delivery_t* delivery) { size_t i = Record::getIndex(pn_delivery_tag(delivery)); @@ -141,7 +155,7 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) break; case PN_MODIFIED: if (preAcquires()) { - //TODO: handle message-annotations + mergeMessageAnnotationsIfRequired(r); if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) { if (!trackingUndeliverableMessages) { // observe queue for changes to track undeliverable messages diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index c56c8c0bf3..f4ca4691b3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -152,6 +152,8 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, static size_t getIndex(pn_delivery_tag_t); }; + void mergeMessageAnnotationsIfRequired(const Record &r); + const bool exclusive; const bool isControllingUser; boost::shared_ptr<Queue> queue; |