summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp50
1 files changed, 29 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index eb0a6e20aa..e531e8cd20 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/amqp/Header.h"
+#include "qpid/broker/amqp/Session.h"
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Selector.h"
@@ -32,9 +33,16 @@ namespace qpid {
namespace broker {
namespace amqp {
-Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession& session, qpid::sys::OutputControl& o, bool topic)
- : Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
- ManagedOutgoingLink(broker, *q, session, pn_link_name(l), topic),
+Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name) : ManagedOutgoingLink(broker, parent, source, name), session(parent) {}
+
+void Outgoing::wakeup()
+{
+ session.wakeup();
+}
+
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic)
+ : Outgoing(broker, session, source, pn_link_name(l)),
+ Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
exclusive(topic),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
@@ -45,12 +53,12 @@ Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, Man
}
}
-void Outgoing::init()
+void OutgoingFromQueue::init()
{
queue->consume(shared_from_this(), exclusive);//may throw exception
}
-bool Outgoing::dispatch()
+bool OutgoingFromQueue::doWork()
{
QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link));
if (canDeliver()) {
@@ -66,12 +74,12 @@ bool Outgoing::dispatch()
return false;
}
-void Outgoing::write(const char* data, size_t size)
+void OutgoingFromQueue::write(const char* data, size_t size)
{
pn_link_send(link, data, size);
}
-void Outgoing::handle(pn_delivery_t* delivery)
+void OutgoingFromQueue::handle(pn_delivery_t* delivery)
{
pn_delivery_tag_t tag = pn_delivery_tag(delivery);
size_t i = *reinterpret_cast<const size_t*>(tag.bytes);
@@ -126,12 +134,12 @@ void Outgoing::handle(pn_delivery_t* delivery)
}
}
-bool Outgoing::canDeliver()
+bool OutgoingFromQueue::canDeliver()
{
return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding;
}
-void Outgoing::detached()
+void OutgoingFromQueue::detached()
{
QPID_LOG(debug, "Detaching outgoing link from " << queue->getName());
queue->cancel(shared_from_this());
@@ -143,7 +151,7 @@ void Outgoing::detached()
}
//Consumer interface:
-bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
+bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
{
Record& r = deliveries[current++];
if (current >= deliveries.capacity()) current = 0;
@@ -155,23 +163,23 @@ bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& m
return true;
}
-void Outgoing::notify()
+void OutgoingFromQueue::notify()
{
QPID_LOG(trace, "Notification received for " << queue->getName());
out.activateOutput();
}
-bool Outgoing::accept(const qpid::broker::Message&)
+bool OutgoingFromQueue::accept(const qpid::broker::Message&)
{
return true;
}
-void Outgoing::setSubjectFilter(const std::string& f)
+void OutgoingFromQueue::setSubjectFilter(const std::string& f)
{
subjectFilter = f;
}
-void Outgoing::setSelectorFilter(const std::string& f)
+void OutgoingFromQueue::setSelectorFilter(const std::string& f)
{
selector.reset(new Selector(f));
}
@@ -217,29 +225,29 @@ bool match(const std::string& filter, const std::string& target)
}
}
-bool Outgoing::filter(const qpid::broker::Message& m)
+bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
{
return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
&& (!selector || selector->filter(m));
}
-void Outgoing::cancel() {}
+void OutgoingFromQueue::cancel() {}
-void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}
+void OutgoingFromQueue::acknowledged(const qpid::broker::DeliveryRecord&) {}
-qpid::broker::OwnershipToken* Outgoing::getSession()
+qpid::broker::OwnershipToken* OutgoingFromQueue::getSession()
{
return 0;
}
-Outgoing::Record::Record() : delivery(0), disposition(0), index(0) {}
-void Outgoing::Record::init(size_t i)
+OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) {}
+void OutgoingFromQueue::Record::init(size_t i)
{
index = i;
tag.bytes = reinterpret_cast<const char*>(&index);
tag.size = sizeof(index);
}
-void Outgoing::Record::reset()
+void OutgoingFromQueue::Record::reset()
{
cursor = QueueCursor();
msg = qpid::broker::Message();