summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp31
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h26
2 files changed, 50 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 90c268418f..20f32a1b37 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -69,7 +69,8 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
- cancelled(false)
+ cancelled(false),
+ trackingUndeliverableMessages(false)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -142,11 +143,17 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
if (preAcquires()) {
//TODO: handle message-annotations
if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
- //treat undeliverable here as rejection
- queue->reject(r.cursor);
- } else {
- queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+ if (!trackingUndeliverableMessages) {
+ // observe queue for changes to track undeliverable messages
+ queue->getObservers().add(
+ boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+ trackingUndeliverableMessages = true;
+ }
+
+ undeliverableMessages.add(r.msg.getSequence());
}
+
+ queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
}
outgoingMessageRejected();//TODO: not quite true...
break;
@@ -168,6 +175,13 @@ bool OutgoingFromQueue::canDeliver()
void OutgoingFromQueue::detached(bool closed)
{
QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
+
+ if (trackingUndeliverableMessages) {
+ // stop observation of the queue
+ queue->getObservers().remove(
+ boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
+ }
+
queue->cancel(shared_from_this());
//TODO: release in a clearer order?
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
@@ -279,6 +293,7 @@ bool match(const std::string& filter, const std::string& target)
bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
{
+ if (undeliverableMessages.contains(m.getSequence())) return false;
return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
&& (!selector || selector->filter(m));
}
@@ -334,5 +349,11 @@ boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoi
else return boost::shared_ptr<Queue>();
}
+void OutgoingFromQueue::dequeued(const qpid::broker::Message &m)
+{
+ if (undeliverableMessages.contains(m.getSequence())) {
+ undeliverableMessages.remove(m.getSequence());
+ }
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index d3825d0894..c56c8c0bf3 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -24,6 +24,7 @@
#include "qpid/broker/amqp/Message.h"
#include "qpid/broker/amqp/ManagedOutgoingLink.h"
#include "qpid/broker/Consumer.h"
+#include "qpid/broker/QueueObserver.h"
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
@@ -36,12 +37,19 @@ namespace qpid {
namespace sys {
class OutputControl;
}
+
+namespace framing {
+class SequenceSet;
+}
+
namespace broker {
class Broker;
class Queue;
class Selector;
+
namespace amqp {
class Session;
+
template <class T>
class CircularArray
{
@@ -75,6 +83,7 @@ class Outgoing : public ManagedOutgoingLink
* Called when a delivery is writable
*/
virtual void handle(pn_delivery_t* delivery) = 0;
+
void wakeup();
virtual ~Outgoing() {}
protected:
@@ -85,7 +94,9 @@ class Outgoing : public ManagedOutgoingLink
* Logic for handling an outgoing link from a queue (even if it is a
* subscription pseduo-queue created by the broker)
*/
-class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
+class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer,
+ public boost::enable_shared_from_this<OutgoingFromQueue>,
+ public qpid::broker::QueueObserver
{
public:
OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
@@ -100,7 +111,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
bool canDeliver();
void detached(bool closed);
- //Consumer interface:
+ // Consumer interface:
bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
void notify();
bool accept(const qpid::broker::Message&);
@@ -110,6 +121,14 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
qpid::broker::OwnershipToken* getSession();
static boost::shared_ptr<Queue> getExclusiveSubscriptionQueue(Outgoing*);
+ // QueueObserver interface
+ virtual void enqueued(const qpid::broker::Message&) {};
+ virtual void acquired(const qpid::broker::Message&) {};
+ virtual void requeued(const qpid::broker::Message&) {};
+ virtual void dequeued(const qpid::broker::Message&);
+ virtual void consumerAdded(const qpid::broker::Consumer&) {};
+ virtual void consumerRemoved(const qpid::broker::Consumer&) {};
+
private:
struct Record
@@ -145,6 +164,9 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
boost::scoped_ptr<Selector> selector;
bool unreliable;
bool cancelled;
+
+ bool trackingUndeliverableMessages;
+ qpid::framing::SequenceSet undeliverableMessages;
};
}}} // namespace qpid::broker::amqp