diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 90c268418f..20f32a1b37 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -69,7 +69,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, buffer(1024)/*used only for header at present*/, //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)), - cancelled(false) + cancelled(false), + trackingUndeliverableMessages(false) { for (size_t i = 0 ; i < deliveries.capacity(); ++i) { deliveries[i].init(i); @@ -142,11 +143,17 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) if (preAcquires()) { //TODO: handle message-annotations if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) { - //treat undeliverable here as rejection - queue->reject(r.cursor); - } else { - queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery))); + if (!trackingUndeliverableMessages) { + // observe queue for changes to track undeliverable messages + queue->getObservers().add( + boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this())); + trackingUndeliverableMessages = true; + } + + undeliverableMessages.add(r.msg.getSequence()); } + + queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery))); } outgoingMessageRejected();//TODO: not quite true... break; @@ -168,6 +175,13 @@ bool OutgoingFromQueue::canDeliver() void OutgoingFromQueue::detached(bool closed) { QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName()); + + if (trackingUndeliverableMessages) { + // stop observation of the queue + queue->getObservers().remove( + boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this())); + } + queue->cancel(shared_from_this()); //TODO: release in a clearer order? for (size_t i = 0 ; i < deliveries.capacity(); ++i) { @@ -279,6 +293,7 @@ bool match(const std::string& filter, const std::string& target) bool OutgoingFromQueue::filter(const qpid::broker::Message& m) { + if (undeliverableMessages.contains(m.getSequence())) return false; return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey())) && (!selector || selector->filter(m)); } @@ -334,5 +349,11 @@ boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoi else return boost::shared_ptr<Queue>(); } +void OutgoingFromQueue::dequeued(const qpid::broker::Message &m) +{ + if (undeliverableMessages.contains(m.getSequence())) { + undeliverableMessages.remove(m.getSequence()); + } +} }}} // namespace qpid::broker::amqp |