summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2016-06-27 15:56:42 +0000
committerGordon Sim <gsim@apache.org>2016-06-27 15:56:42 +0000
commit590ac0b0c52a838b5cb1761b79d17d9b78256b2e (patch)
tree5cc2ddb5a520a18d7b24d86ad02c79b84a42428a /qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
parentc077bea50c85a48731ecfd9e7f61f7c949b0749b (diff)
downloadqpid-python-590ac0b0c52a838b5cb1761b79d17d9b78256b2e.tar.gz
feat(disposition): support undeliverable-here in modified outcomes
Previously, specifying `undeliverable-here` as `true` in a modified outcome simply resulted in the message being rejected. This patch adds tracking to the outgoing link management in order to not redeliver messages to links that indicate that messages are not deliverable there. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1750369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp31
1 files changed, 26 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 90c268418f..20f32a1b37 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -69,7 +69,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
- cancelled(false)
+ cancelled(false),
+ trackingUndeliverableMessages(false)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -142,11 +143,17 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
if (preAcquires()) {
//TODO: handle message-annotations
if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
- //treat undeliverable here as rejection
- queue->reject(r.cursor);
- } else {
- queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+ if (!trackingUndeliverableMessages) {
+ // observe queue for changes to track undeliverable messages
+ queue->getObservers().add(
+ boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+ trackingUndeliverableMessages = true;
+ }
+
+ undeliverableMessages.add(r.msg.getSequence());
}
+
+ queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
}
outgoingMessageRejected();//TODO: not quite true...
break;
@@ -168,6 +175,13 @@ bool OutgoingFromQueue::canDeliver()
void OutgoingFromQueue::detached(bool closed)
{
QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
+
+ if (trackingUndeliverableMessages) {
+ // stop observation of the queue
+ queue->getObservers().remove(
+ boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+ }
+
queue->cancel(shared_from_this());
//TODO: release in a clearer order?
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
@@ -279,6 +293,7 @@ bool match(const std::string& filter, const std::string& target)
bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
{
+ if (undeliverableMessages.contains(m.getSequence())) return false;
return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
&& (!selector || selector->filter(m));
}
@@ -334,5 +349,11 @@ boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoi
else return boost::shared_ptr<Queue>();
}
+void OutgoingFromQueue::dequeued(const qpid::broker::Message &m)
+{
+ if (undeliverableMessages.contains(m.getSequence())) {
+ undeliverableMessages.remove(m.getSequence());
+ }
+}
}}} // namespace qpid::broker::amqp