summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp31
1 files changed, 26 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 90c268418f..20f32a1b37 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -69,7 +69,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
- cancelled(false)
+ cancelled(false),
+ trackingUndeliverableMessages(false)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -142,11 +143,17 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
if (preAcquires()) {
//TODO: handle message-annotations
if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
- //treat undeliverable here as rejection
- queue->reject(r.cursor);
- } else {
- queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+ if (!trackingUndeliverableMessages) {
+ // observe queue for changes to track undeliverable messages
+ queue->getObservers().add(
+ boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+ trackingUndeliverableMessages = true;
+ }
+
+ undeliverableMessages.add(r.msg.getSequence());
}
+
+ queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
}
outgoingMessageRejected();//TODO: not quite true...
break;
@@ -168,6 +175,13 @@ bool OutgoingFromQueue::canDeliver()
void OutgoingFromQueue::detached(bool closed)
{
QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
+
+ if (trackingUndeliverableMessages) {
+ // stop observation of the queue
+ queue->getObservers().remove(
+ boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+ }
+
queue->cancel(shared_from_this());
//TODO: release in a clearer order?
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
@@ -279,6 +293,7 @@ bool match(const std::string& filter, const std::string& target)
bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
{
+ if (undeliverableMessages.contains(m.getSequence())) return false;
return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
&& (!selector || selector->filter(m));
}
@@ -334,5 +349,11 @@ boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoi
else return boost::shared_ptr<Queue>();
}
+void OutgoingFromQueue::dequeued(const qpid::broker::Message &m)
+{
+ if (undeliverableMessages.contains(m.getSequence())) {
+ undeliverableMessages.remove(m.getSequence());
+ }
+}
}}} // namespace qpid::broker::amqp