diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index d3825d0894..c56c8c0bf3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -24,6 +24,7 @@ #include "qpid/broker/amqp/Message.h" #include "qpid/broker/amqp/ManagedOutgoingLink.h" #include "qpid/broker/Consumer.h" +#include "qpid/broker/QueueObserver.h" #include <boost/shared_ptr.hpp> #include <boost/scoped_ptr.hpp> @@ -36,12 +37,19 @@ namespace qpid { namespace sys { class OutputControl; } + +namespace framing { +class SequenceSet; +} + namespace broker { class Broker; class Queue; class Selector; + namespace amqp { class Session; + template <class T> class CircularArray { @@ -75,6 +83,7 @@ class Outgoing : public ManagedOutgoingLink * Called when a delivery is writable */ virtual void handle(pn_delivery_t* delivery) = 0; + void wakeup(); virtual ~Outgoing() {} protected: @@ -85,7 +94,9 @@ class Outgoing : public ManagedOutgoingLink * Logic for handling an outgoing link from a queue (even if it is a * subscription pseduo-queue created by the broker) */ -class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue> +class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, + public boost::enable_shared_from_this<OutgoingFromQueue>, + public qpid::broker::QueueObserver { public: OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, @@ -100,7 +111,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public bool canDeliver(); void detached(bool closed); - //Consumer interface: + // Consumer interface: bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); void notify(); bool accept(const qpid::broker::Message&); @@ -110,6 +121,14 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public qpid::broker::OwnershipToken* getSession(); static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*); + // QueueObserver interface + virtual void enqueued(const qpid::broker::Message&) {}; + virtual void acquired(const qpid::broker::Message&) {}; + virtual void requeued(const qpid::broker::Message&) {}; + virtual void dequeued(const qpid::broker::Message&); + virtual void consumerAdded(const qpid::broker::Consumer&) {}; + virtual void consumerRemoved(const qpid::broker::Consumer&) {}; + private: struct Record @@ -145,6 +164,9 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::scoped_ptr<Selector> selector; bool unreliable; bool cancelled; + + bool trackingUndeliverableMessages; + qpid::framing::SequenceSet undeliverableMessages; }; }}} // namespace qpid::broker::amqp |