summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp8
-rw-r--r--cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp19
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp32
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h11
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp2
6 files changed, 57 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ef6285481c..52f5e4872d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -83,7 +83,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -98,7 +98,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
+ mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
mgmtObject(0),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
state(INIT),
@@ -193,7 +193,9 @@ void Cluster::deliver(
void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
QPID_LOG(trace, *this << " PUSH: " << e);
- deliverQueue.push(e); // Otherwise enqueue for processing.
+ if (e.getMemberId() == myId)
+ mcast.delivered(e); // Note delivery for flow control
+ deliverQueue.push(e);
}
// Entry point: called when deliverQueue has events to process.
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index b8fe61bf15..097955ebaa 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -70,7 +70,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
* Join a cluster.
*/
Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
- size_t readMax, size_t writeEstimate);
+ size_t readMax, size_t writeEstimate, size_t mcastMax);
virtual ~Cluster();
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 783e4e5009..7f3a9ac6aa 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -41,10 +41,10 @@ struct ClusterValues {
string name;
string url;
bool quorum;
- size_t readMax, writeEstimate;
+ size_t readMax, writeEstimate, mcastMax;
// FIXME aconway 2008-12-09: revisit default.
- ClusterValues() : quorum(false), readMax(0), writeEstimate(64) {}
+ ClusterValues() : quorum(false), readMax(10), writeEstimate(64), mcastMax(10) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -69,10 +69,11 @@ struct ClusterOptions : public Options {
("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-read-max", optValue(values.readMax,"N"),
- "Experimental: Throttle read rate from client connections.")
+ "Experimental: Max unreplicated reads per connetion connection. 0=no limit.")
+ ("cluster-mcast-max", optValue(values.mcastMax,"N"),
+ "Experimental: Max outstanding multicasts per broker. 0=no limit.")
("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
- "Experimental: initial estimate for connection write per multicast cycle")
- ;
+ "Experimental: initial estimate for connection writes rate per multicast cycle");
}
};
@@ -91,7 +92,13 @@ struct ClusterPlugin : public Plugin {
if (values.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax, values.writeEstimate*1024);
+ cluster = new Cluster(
+ values.name,
+ values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
+ *broker,
+ values.quorum,
+ values.readMax, values.writeEstimate*1024, values.mcastMax
+ );
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index a106ec128b..b02fa16ae9 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -28,10 +28,14 @@
namespace qpid {
namespace cluster {
-Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) :
- onError(onError_), cpg(cpg_),
+Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_,
+ const boost::shared_ptr<sys::Poller>& poller,
+ boost::function<void()> onError_) :
+ onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- holding(true)
+ holding(true),
+ mcastMax(mcastMax_),
+ pending(0)
{
queue.start();
}
@@ -56,6 +60,7 @@ void Multicaster::mcast(const Event& e) {
}
}
queue.push(e);
+
}
@@ -64,9 +69,18 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
PollableEventQueue::Queue::iterator i = values.begin();
while( i != values.end()) {
iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() };
- if (!cpg.mcast(&iov, 1)) break; // returns false for flow control
+ if (!cpg.mcast(&iov, 1))
+ break; // cpg.mcast returns false for flow control
QPID_LOG(trace, " MCAST " << *i);
++i;
+ if (mcastMax) {
+ sys::Mutex::ScopedLock l(lock);
+ assert(pending < mcastMax);
+ if (++pending == mcastMax) {
+ queue.stop();
+ break ;
+ }
+ }
}
values.erase(values.begin(), i);
}
@@ -84,4 +98,14 @@ void Multicaster::release() {
holdingQueue.clear();
}
+void Multicaster::delivered(const Event&) {
+ sys::Mutex::ScopedLock l(lock);
+ if (mcastMax) {
+ assert(pending <= mcastMax);
+ if (pending == mcastMax)
+ queue.start();
+ --pending;
+ }
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index a63bbc2180..ef8be8c229 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -46,13 +46,19 @@ class Multicaster
{
public:
/** Starts in holding mode: connection data events are held, other events are mcast */
- Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>&, boost::function<void()> onError );
+ Multicaster(Cpg& cpg_,
+ size_t mcastMax,
+ const boost::shared_ptr<sys::Poller>&,
+ boost::function<void()> onError
+ );
void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
/** End holding mode, held events are mcast */
void release();
-
+ /** Call when events are self-delivered to manage flow control. */
+ void delivered(const Event& e);
+
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
@@ -66,6 +72,7 @@ class Multicaster
bool holding;
PlainEventQueue holdingQueue;
std::vector<struct ::iovec> ioVector;
+ size_t mcastMax, pending;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index f37dda7738..1565ef2efb 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -77,7 +77,7 @@ bool OutputInterceptor::doOutput() {
// which tranfers frames to the codec for writing.
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
- size_t buf = next->getBuffered();
+ size_t buf = getBuffered();
if (parent.isLocal())
writeEstimate.delivered(requested, sent, buf); // Update the estimate.