diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 26 |
2 files changed, 50 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 90c268418f..20f32a1b37 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -69,7 +69,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, buffer(1024)/*used only for header at present*/, //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)), - cancelled(false) + cancelled(false), + trackingUndeliverableMessages(false) { for (size_t i = 0 ; i < deliveries.capacity(); ++i) { deliveries[i].init(i); @@ -142,11 +143,17 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) if (preAcquires()) { //TODO: handle message-annotations if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) { - //treat undeliverable here as rejection - queue->reject(r.cursor); - } else { - queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery))); + if (!trackingUndeliverableMessages) { + // observe queue for changes to track undeliverable messages + queue->getObservers().add( + boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this())); + trackingUndeliverableMessages = true; + } + + undeliverableMessages.add(r.msg.getSequence()); } + + queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery))); } outgoingMessageRejected();//TODO: not quite true... break; @@ -168,6 +175,13 @@ bool OutgoingFromQueue::canDeliver() void OutgoingFromQueue::detached(bool closed) { QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName()); + + if (trackingUndeliverableMessages) { + // stop observation of the queue + queue->getObservers().remove( + boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this())); + } + queue->cancel(shared_from_this()); //TODO: release in a clearer order? for (size_t i = 0 ; i < deliveries.capacity(); ++i) { @@ -279,6 +293,7 @@ bool match(const std::string& filter, const std::string& target) bool OutgoingFromQueue::filter(const qpid::broker::Message& m) { + if (undeliverableMessages.contains(m.getSequence())) return false; return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey())) && (!selector || selector->filter(m)); } @@ -334,5 +349,11 @@ boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoi else return boost::shared_ptr<Queue>(); } +void OutgoingFromQueue::dequeued(const qpid::broker::Message &m) +{ + if (undeliverableMessages.contains(m.getSequence())) { + undeliverableMessages.remove(m.getSequence()); + } +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index d3825d0894..c56c8c0bf3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -24,6 +24,7 @@ #include "qpid/broker/amqp/Message.h" #include "qpid/broker/amqp/ManagedOutgoingLink.h" #include "qpid/broker/Consumer.h" +#include "qpid/broker/QueueObserver.h" #include <boost/shared_ptr.hpp> #include <boost/scoped_ptr.hpp> @@ -36,12 +37,19 @@ namespace qpid { namespace sys { class OutputControl; } + +namespace framing { +class SequenceSet; +} + namespace broker { class Broker; class Queue; class Selector; + namespace amqp { class Session; + template <class T> class CircularArray { @@ -75,6 +83,7 @@ class Outgoing : public ManagedOutgoingLink * Called when a delivery is writable */ virtual void handle(pn_delivery_t* delivery) = 0; + void wakeup(); virtual ~Outgoing() {} protected: @@ -85,7 +94,9 @@ class Outgoing : public ManagedOutgoingLink * Logic for handling an outgoing link from a queue (even if it is a * subscription pseduo-queue created by the broker) */ -class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue> +class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, + public boost::enable_shared_from_this<OutgoingFromQueue>, + public qpid::broker::QueueObserver { public: OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, @@ -100,7 +111,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public bool canDeliver(); void detached(bool closed); - //Consumer interface: + // Consumer interface: bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); void notify(); bool accept(const qpid::broker::Message&); @@ -110,6 +121,14 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public qpid::broker::OwnershipToken* getSession(); static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*); + // QueueObserver interface + virtual void enqueued(const qpid::broker::Message&) {}; + virtual void acquired(const qpid::broker::Message&) {}; + virtual void requeued(const qpid::broker::Message&) {}; + virtual void dequeued(const qpid::broker::Message&); + virtual void consumerAdded(const qpid::broker::Consumer&) {}; + virtual void consumerRemoved(const qpid::broker::Consumer&) {}; + private: struct Record @@ -145,6 +164,9 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::scoped_ptr<Selector> selector; bool unreliable; bool cancelled; + + bool trackingUndeliverableMessages; + qpid::framing::SequenceSet undeliverableMessages; }; }}} // namespace qpid::broker::amqp |