From 5a4cc61cf5895271b46d34b089cd04bf93885590 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 28 Nov 2012 14:13:21 +0000 Subject: QPID-4452: fix credit for producers git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1414706 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/amqp/Session.cpp | 50 +++++++++++++--------- qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 2 +- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 760fa2d902..fabe609473 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -53,32 +53,33 @@ namespace amqp { class Target { public: + Target(pn_link_t* l) : credit(100), window(0), link(l) {} virtual ~Target() {} - virtual void flow() = 0; + bool flow(); + bool needFlow(); virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message - private: + protected: + const uint32_t credit; + uint32_t window; + pn_link_t* link; }; class Queue : public Target { public: - Queue(boost::shared_ptr q, pn_link_t* l) : queue(q), link(l) {} - void flow(); + Queue(boost::shared_ptr q, pn_link_t* l) : Target(l), queue(q) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr queue; - pn_link_t* link; }; class Exchange : public Target { public: - Exchange(boost::shared_ptr e, pn_link_t* l) : exchange(e), link(l) {} - void flow(); + Exchange(boost::shared_ptr e, pn_link_t* l) : Target(l), exchange(e) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr exchange; - pn_link_t* link; }; Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o) @@ -169,11 +170,9 @@ void Session::attach(pn_link_t* link) if (node.queue) { boost::shared_ptr q(new Queue(node.queue, link)); targets[link] = q; - q->flow(); } else if (node.exchange) { boost::shared_ptr e(new Exchange(node.exchange, link)); targets[link] = e; - e->flow(); } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ @@ -253,7 +252,7 @@ void Session::incoming(pn_link_t* link, pn_delivery_t* delivery) received->begin(); Transfer t(delivery, shared_from_this()); received->end(t); - target->second->flow(); + if (target->second->needFlow()) out.activateOutput(); } } void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery) @@ -283,6 +282,9 @@ bool Session::dispatch() accepted(*i, true); } } + for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) { + if (t->second->flow()) output = true; + } return output; } @@ -299,24 +301,32 @@ void Session::close() deleted = true; } -void Queue::flow() +void Queue::handle(qpid::broker::Message& message) { - pn_link_flow(link, 1);//TODO: proper flow control + queue->deliver(message); + --window; } -void Queue::handle(qpid::broker::Message& message) +void Exchange::handle(qpid::broker::Message& message) { - queue->deliver(message); + DeliverableMessage deliverable(message, 0); + exchange->route(deliverable); + --window; } -void Exchange::flow() +bool Target::flow() { - pn_link_flow(link, 1);//TODO: proper flow control + bool issue = window < credit; + if (issue) { + pn_link_flow(link, credit - window);//TODO: proper flow control + window = credit; + } + return issue; } -void Exchange::handle(qpid::broker::Message& message) +bool Target::needFlow() { - DeliverableMessage deliverable(message, 0); - exchange->route(deliverable); + return window <= (credit/2); } + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index e15c645e2c..9dc7c25121 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -80,7 +80,7 @@ const std::string& SenderContext::getTarget() const SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) { - if (processUnsettled() < capacity) { + if (processUnsettled() < capacity && pn_link_credit(sender)) { deliveries.push_back(Delivery(nextId++)); Delivery& delivery = deliveries.back(); delivery.encode(MessageImplAccess::get(message), address); -- cgit v1.2.1