summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-11-28 14:13:21 +0000
committerGordon Sim <gsim@apache.org>2012-11-28 14:13:21 +0000
commit5a4cc61cf5895271b46d34b089cd04bf93885590 (patch)
treee17d5824dca3947971d03d945807a0c6c82247ed
parentd74aa0673954287cc39523e6da07bc43c39d95e4 (diff)
downloadqpid-python-5a4cc61cf5895271b46d34b089cd04bf93885590.tar.gz
QPID-4452: fix credit for producers
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1414706 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp50
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp2
2 files changed, 31 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 760fa2d902..fabe609473 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -53,32 +53,33 @@ namespace amqp {
class Target
{
public:
+ Target(pn_link_t* l) : credit(100), window(0), link(l) {}
virtual ~Target() {}
- virtual void flow() = 0;
+ bool flow();
+ bool needFlow();
virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message
- private:
+ protected:
+ const uint32_t credit;
+ uint32_t window;
+ pn_link_t* link;
};
class Queue : public Target
{
public:
- Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : queue(q), link(l) {}
- void flow();
+ Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
- pn_link_t* link;
};
class Exchange : public Target
{
public:
- Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : exchange(e), link(l) {}
- void flow();
+ Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
- pn_link_t* link;
};
Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
@@ -169,11 +170,9 @@ void Session::attach(pn_link_t* link)
if (node.queue) {
boost::shared_ptr<Target> q(new Queue(node.queue, link));
targets[link] = q;
- q->flow();
} else if (node.exchange) {
boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
targets[link] = e;
- e->flow();
} else {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
@@ -253,7 +252,7 @@ void Session::incoming(pn_link_t* link, pn_delivery_t* delivery)
received->begin();
Transfer t(delivery, shared_from_this());
received->end(t);
- target->second->flow();
+ if (target->second->needFlow()) out.activateOutput();
}
}
void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery)
@@ -283,6 +282,9 @@ bool Session::dispatch()
accepted(*i, true);
}
}
+ for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) {
+ if (t->second->flow()) output = true;
+ }
return output;
}
@@ -299,24 +301,32 @@ void Session::close()
deleted = true;
}
-void Queue::flow()
+void Queue::handle(qpid::broker::Message& message)
{
- pn_link_flow(link, 1);//TODO: proper flow control
+ queue->deliver(message);
+ --window;
}
-void Queue::handle(qpid::broker::Message& message)
+void Exchange::handle(qpid::broker::Message& message)
{
- queue->deliver(message);
+ DeliverableMessage deliverable(message, 0);
+ exchange->route(deliverable);
+ --window;
}
-void Exchange::flow()
+bool Target::flow()
{
- pn_link_flow(link, 1);//TODO: proper flow control
+ bool issue = window < credit;
+ if (issue) {
+ pn_link_flow(link, credit - window);//TODO: proper flow control
+ window = credit;
+ }
+ return issue;
}
-void Exchange::handle(qpid::broker::Message& message)
+bool Target::needFlow()
{
- DeliverableMessage deliverable(message, 0);
- exchange->route(deliverable);
+ return window <= (credit/2);
}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index e15c645e2c..9dc7c25121 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -80,7 +80,7 @@ const std::string& SenderContext::getTarget() const
SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
{
- if (processUnsettled() < capacity) {
+ if (processUnsettled() < capacity && pn_link_credit(sender)) {
deliveries.push_back(Delivery(nextId++));
Delivery& delivery = deliveries.back();
delivery.encode(MessageImplAccess::get(message), address);