diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index eb0a6e20aa..e531e8cd20 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/amqp/Header.h" +#include "qpid/broker/amqp/Session.h" #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Selector.h" @@ -32,9 +33,16 @@ namespace qpid { namespace broker { namespace amqp { -Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession& session, qpid::sys::OutputControl& o, bool topic) - : Consumer(pn_link_name(l), /*FIXME*/CONSUMER), - ManagedOutgoingLink(broker, *q, session, pn_link_name(l), topic), +Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name) : ManagedOutgoingLink(broker, parent, source, name), session(parent) {} + +void Outgoing::wakeup() +{ + session.wakeup(); +} + +OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic) + : Outgoing(broker, session, source, pn_link_name(l)), + Consumer(pn_link_name(l), /*FIXME*/CONSUMER), exclusive(topic), queue(q), deliveries(5000), link(l), out(o), current(0), outstanding(0), @@ -45,12 +53,12 @@ Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, Man } } -void Outgoing::init() +void OutgoingFromQueue::init() { queue->consume(shared_from_this(), exclusive);//may throw exception } -bool Outgoing::dispatch() +bool OutgoingFromQueue::doWork() { QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link)); if (canDeliver()) { @@ -66,12 +74,12 @@ bool Outgoing::dispatch() return false; } -void Outgoing::write(const char* data, size_t size) +void OutgoingFromQueue::write(const char* data, size_t size) { pn_link_send(link, data, size); } -void Outgoing::handle(pn_delivery_t* delivery) +void OutgoingFromQueue::handle(pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); size_t i = *reinterpret_cast<const size_t*>(tag.bytes); @@ -126,12 +134,12 @@ void Outgoing::handle(pn_delivery_t* delivery) } } -bool Outgoing::canDeliver() +bool OutgoingFromQueue::canDeliver() { return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding; } -void Outgoing::detached() +void OutgoingFromQueue::detached() { QPID_LOG(debug, "Detaching outgoing link from " << queue->getName()); queue->cancel(shared_from_this()); @@ -143,7 +151,7 @@ void Outgoing::detached() } //Consumer interface: -bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg) +bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg) { Record& r = deliveries[current++]; if (current >= deliveries.capacity()) current = 0; @@ -155,23 +163,23 @@ bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& m return true; } -void Outgoing::notify() +void OutgoingFromQueue::notify() { QPID_LOG(trace, "Notification received for " << queue->getName()); out.activateOutput(); } -bool Outgoing::accept(const qpid::broker::Message&) +bool OutgoingFromQueue::accept(const qpid::broker::Message&) { return true; } -void Outgoing::setSubjectFilter(const std::string& f) +void OutgoingFromQueue::setSubjectFilter(const std::string& f) { subjectFilter = f; } -void Outgoing::setSelectorFilter(const std::string& f) +void OutgoingFromQueue::setSelectorFilter(const std::string& f) { selector.reset(new Selector(f)); } @@ -217,29 +225,29 @@ bool match(const std::string& filter, const std::string& target) } } -bool Outgoing::filter(const qpid::broker::Message& m) +bool OutgoingFromQueue::filter(const qpid::broker::Message& m) { return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey())) && (!selector || selector->filter(m)); } -void Outgoing::cancel() {} +void OutgoingFromQueue::cancel() {} -void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {} +void OutgoingFromQueue::acknowledged(const qpid::broker::DeliveryRecord&) {} -qpid::broker::OwnershipToken* Outgoing::getSession() +qpid::broker::OwnershipToken* OutgoingFromQueue::getSession() { return 0; } -Outgoing::Record::Record() : delivery(0), disposition(0), index(0) {} -void Outgoing::Record::init(size_t i) +OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) {} +void OutgoingFromQueue::Record::init(size_t i) { index = i; tag.bytes = reinterpret_cast<const char*>(&index); tag.size = sizeof(index); } -void Outgoing::Record::reset() +void OutgoingFromQueue::Record::reset() { cursor = QueueCursor(); msg = qpid::broker::Message(); |