summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.h')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h26
1 files changed, 24 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index d3825d0894..c56c8c0bf3 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -24,6 +24,7 @@
#include "qpid/broker/amqp/Message.h"
#include "qpid/broker/amqp/ManagedOutgoingLink.h"
#include "qpid/broker/Consumer.h"
+#include "qpid/broker/QueueObserver.h"
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
@@ -36,12 +37,19 @@ namespace qpid {
namespace sys {
class OutputControl;
}
+
+namespace framing {
+class SequenceSet;
+}
+
namespace broker {
class Broker;
class Queue;
class Selector;
+
namespace amqp {
class Session;
+
template <class T>
class CircularArray
{
@@ -75,6 +83,7 @@ class Outgoing : public ManagedOutgoingLink
* Called when a delivery is writable
*/
virtual void handle(pn_delivery_t* delivery) = 0;
+
void wakeup();
virtual ~Outgoing() {}
protected:
@@ -85,7 +94,9 @@ class Outgoing : public ManagedOutgoingLink
* Logic for handling an outgoing link from a queue (even if it is a
* subscription pseduo-queue created by the broker)
*/
-class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
+class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer,
+ public boost::enable_shared_from_this<OutgoingFromQueue>,
+ public qpid::broker::QueueObserver
{
public:
OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
@@ -100,7 +111,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
bool canDeliver();
void detached(bool closed);
- //Consumer interface:
+ // Consumer interface:
bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
void notify();
bool accept(const qpid::broker::Message&);
@@ -110,6 +121,14 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
qpid::broker::OwnershipToken* getSession();
static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*);
+ // QueueObserver interface
+ virtual void enqueued(const qpid::broker::Message&) {};
+ virtual void acquired(const qpid::broker::Message&) {};
+ virtual void requeued(const qpid::broker::Message&) {};
+ virtual void dequeued(const qpid::broker::Message&);
+ virtual void consumerAdded(const qpid::broker::Consumer&) {};
+ virtual void consumerRemoved(const qpid::broker::Consumer&) {};
+
private:
struct Record
@@ -145,6 +164,9 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
boost::scoped_ptr<Selector> selector;
bool unreliable;
bool cancelled;
+
+ bool trackingUndeliverableMessages;
+ qpid::framing::SequenceSet undeliverableMessages;
};
}}} // namespace qpid::broker::amqp