summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
committerAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
commit2b97a69197fb986c209339c48ed98bb45203e107 (patch)
tree8bd157cc9d19757b6d9c00c5ab2c353ca336f8bf /cpp/src/qpid/cluster
parentc6c237e2450250a6ef18c5af93e2a733aba10932 (diff)
downloadqpid-python-2b97a69197fb986c209339c48ed98bb45203e107.tar.gz
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp31
-rw-r--r--cpp/src/qpid/cluster/ConnectionInterceptor.cpp36
-rw-r--r--cpp/src/qpid/cluster/ConnectionInterceptor.h28
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp106
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h74
-rw-r--r--cpp/src/qpid/cluster/WriteEstimate.cpp63
-rw-r--r--cpp/src/qpid/cluster/WriteEstimate.h64
7 files changed, 348 insertions, 54 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 05ab9148b5..75ce19e835 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,12 +25,14 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
+#include <boost/current_function.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -76,11 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpg.join(name);
notify();
- // FIXME aconway 2008-08-11: can we remove this loop?
- // Dispatch till we show up in the cluster map.
- while (empty())
- cpg.dispatchOne();
-
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
deliverQueue.start(poller);
@@ -97,9 +94,8 @@ Cluster::~Cluster() {
std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
}
-// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
- bool isLocal = &c.getOutput() != &shadowOut;
+ bool isLocal = c.getOutput().get() != &shadowOut;
if (isLocal)
localConnectionSet.insert(new ConnectionInterceptor(c, *this));
}
@@ -107,10 +103,8 @@ void Cluster::initialize(broker::Connection& c) {
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
- // At this point the poller has already been shut down so
- // no dispatches can occur thru the cpgDispatchHandle.
- //
- // FIXME aconway 2008-08-11: assert this is the cae.
+ // Leave is called by from Broker destructor after the poller has
+ // been shut down. No dispatches can occur.
QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
@@ -173,13 +167,6 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-// ################ HERE - leaking shadow connections.
-// FIXME aconway 2008-08-11: revisit memory management for shadow
-// connections, what if the Connection is closed other than via
-// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
-// map, add deleted state to ConnectionInterceptor? Interceptors need
-// to know about map? Check how Connections can be deleted.
-
ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
ShadowConnectionId id(member, remotePtr);
ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -274,7 +261,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod
break;
}
case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
- connection->deliverDoOutput();
+ ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
+ connection->deliverDoOutput(doOutput.getBytes());
break;
}
default:
@@ -309,9 +297,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
void Cluster::disconnect(sys::DispatchHandle& h) {
h.stopWatch();
- // FIXME aconway 2008-08-11: error handling if we are disconnected.
- // Kill the broker?
- assert(0);
+ QPID_LOG(critical, "Disconnected from cluster, shutting down");
+ broker.shutdown();
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
index c13651eccb..efcab1b731 100644
--- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
+++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
@@ -22,6 +22,8 @@
#include "qpid/framing/ClusterConnectionCloseBody.h"
#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
namespace qpid {
namespace cluster {
@@ -32,24 +34,27 @@ template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b;
ConnectionInterceptor::ConnectionInterceptor(
broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
- : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_)
+ : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get())
{
connection->addFinalizer(boost::bind(operator delete, this));
+ connection->setOutputHandler(&output),
// Attach my functions to Connection extension points.
shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
- shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this));
+ shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output));
}
ConnectionInterceptor::~ConnectionInterceptor() {
assert(connection == 0);
}
+// Forward all received frames to the cluster, continue handling on delivery.
void ConnectionInterceptor::received(framing::AMQFrame& f) {
if (isClosed) return;
cluster.send(f, this);
}
+// Continue normal handling of delivered frames.
void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
receivedNext(f);
}
@@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() {
}
void ConnectionInterceptor::dirtyClose() {
- // Not closed via cluster self-delivery but closed locally.
- // Used for dirty cluster shutdown where active connections
- // must be cleaned up.
+ // Not closed via cluster self-delivery but closed locally. Used
+ // when local broker is shut down without a clean cluster shutdown.
+ // Release the connection, it will delete this.
connection = 0;
}
-bool ConnectionInterceptor::doOutput() {
- // FIXME aconway 2008-08-15: this is not correct.
- // Run in write threads so order of execution of doOutput is not determinate.
- // Will only work reliably for in single-consumer tests.
-
- if (connection->hasOutput()) {
- cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
- return doOutputNext();
- }
- return false;
-}
-
-void ConnectionInterceptor::deliverDoOutput() {
- // FIXME aconway 2008-08-15: see comment in doOutput.
- if (isShadow())
- doOutputNext();
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void ConnectionInterceptor::deliverDoOutput(size_t requested) {
+ output.deliverDoOutput(requested);
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h
index 370572bd9d..9216921067 100644
--- a/cpp/src/qpid/cluster/ConnectionInterceptor.h
+++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H
-#define QPID_CLUSTER_CONNECTIONPLUGIN_H
+#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
+#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
/*
*
@@ -23,6 +23,8 @@
*/
#include "Cluster.h"
+#include "WriteEstimate.h"
+#include "OutputInterceptor.h"
#include "qpid/broker/Connection.h"
#include "qpid/sys/ConnectionOutputHandler.h"
@@ -41,42 +43,46 @@ class ConnectionInterceptor {
Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
- bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
-
+ bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); }
+ bool isLocal() const { return !isShadow(); }
+ bool getClosed() const { return isClosed; }
+
// self-delivery of intercepted extension points.
void deliver(framing::AMQFrame& f);
void deliverClosed();
- void deliverDoOutput();
+ void deliverDoOutput(size_t requested);
void dirtyClose();
+ Cluster& getCluster() { return cluster; }
+
private:
struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
void close() {}
void send(framing::AMQFrame&) {}
- void doOutput() {}
void activateOutput() {}
};
- bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); }
-
// Functions to intercept to Connection extension points.
void received(framing::AMQFrame&);
void closed();
bool doOutput();
+ void activateOutput();
+
+ void sendDoOutput();
boost::function<void (framing::AMQFrame&)> receivedNext;
boost::function<void ()> closedNext;
- boost::function<bool ()> doOutputNext;
boost::intrusive_ptr<broker::Connection> connection;
Cluster& cluster;
NullConnectionHandler discardHandler;
bool isClosed;
Cluster::ShadowConnectionId shadowId;
+ WriteEstimate writeEstimate;
+ OutputInterceptor output;
};
}} // namespace qpid::cluster
-#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/
-
+#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
new file mode 100644
index 0000000000..84d3a6ad69
--- /dev/null
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "OutputInterceptor.h"
+#include "ConnectionInterceptor.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h)
+ : parent(p), next(h), sent(), moreOutput(), doingOutput()
+{}
+
+void OutputInterceptor::send(framing::AMQFrame& f) {
+ Locker l(lock);
+ next.send(f);
+ sent += f.size();
+}
+
+void OutputInterceptor::activateOutput() {
+ Locker l(lock);
+ moreOutput = true;
+ sendDoOutput();
+}
+
+// Called in write thread when the IO layer has no more data to write.
+// We do nothing in the write thread, we run doOutput only on delivery
+// of doOutput requests.
+bool OutputInterceptor::doOutput() {
+ return false;
+}
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void OutputInterceptor::deliverDoOutput(size_t requested) {
+ if (parent.getClosed()) return;
+
+ Locker l(lock);
+ size_t buf = next.getBuffered();
+ if (parent.isLocal())
+ writeEstimate.delivered(sent, buf); // Update the estimate.
+
+ // Run the real doOutput() till we have added the requested data or there's nothing to output.
+ sent = 0;
+ do {
+ sys::Mutex::ScopedUnlock u(lock);
+ moreOutput = doOutputNext(); // Calls send()
+ } while (sent < requested && moreOutput);
+ sent += buf; // Include buffered data in the sent total.
+
+ QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
+
+ if (parent.isLocal() && moreOutput)
+ sendDoOutput();
+ else
+ doingOutput = false;
+}
+
+void OutputInterceptor::startDoOutput() {
+ if (!doingOutput)
+ sendDoOutput();
+}
+
+// Send a doOutput request if one is not already in flight.
+void OutputInterceptor::sendDoOutput() {
+ // Call with lock held.
+ if (parent.isShadow() || parent.getClosed())
+ return;
+
+ doingOutput = true;
+ size_t request = writeEstimate.sending(getBuffered());
+
+ // Note we may send 0 size request if there's more than 2*estimate in the buffer.
+ // Send it anyway to keep the doOutput chain going until we are sure there's no more output
+ // (in deliverDoOutput)
+ //
+ parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>(
+ framing::ProtocolVersion(), request)), &parent);
+ QPID_LOG(trace, &parent << "Send doOutput request for " << request);
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
new file mode 100644
index 0000000000..b39f2a2be9
--- /dev/null
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -0,0 +1,74 @@
+#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/broker/ConnectionFactory.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace framing { class AMQFrame; }
+namespace cluster {
+
+class ConnectionInterceptor;
+
+/**
+ * Interceptor for connection OutputHandler, manages outgoing message replication.
+ */
+class OutputInterceptor : public sys::ConnectionOutputHandler {
+ public:
+ OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h);
+
+ // sys::ConnectionOutputHandler functions
+ void send(framing::AMQFrame& f);
+ void activateOutput();
+ void close() { Locker l(lock); next.close(); }
+ size_t getBuffered() const { Locker l(lock); return next.getBuffered(); }
+
+ // Delivery point for doOutput requests.
+ void deliverDoOutput(size_t requested);
+ // Intercept doOutput requests on Connection.
+ bool doOutput();
+
+ boost::function<bool ()> doOutputNext;
+
+ ConnectionInterceptor& parent;
+
+ private:
+ typedef sys::Mutex::ScopedLock Locker;
+
+ void startDoOutput();
+ void sendDoOutput();
+
+ mutable sys::Mutex lock;
+ sys::ConnectionOutputHandler& next;
+ size_t sent;
+ WriteEstimate writeEstimate;
+ bool moreOutput;
+ bool doingOutput;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/
diff --git a/cpp/src/qpid/cluster/WriteEstimate.cpp b/cpp/src/qpid/cluster/WriteEstimate.cpp
new file mode 100644
index 0000000000..81131be437
--- /dev/null
+++ b/cpp/src/qpid/cluster/WriteEstimate.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/log/Statement.h"
+#include <boost/current_function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+WriteEstimate::WriteEstimate(size_t initial)
+ : growing(true), estimate(initial) {}
+
+size_t WriteEstimate::sending(size_t buffered) {
+ // We want to send a doOutput request for enough data such
+ // that if estimate bytes are written before it is self
+ // delivered then what is left in the buffer plus the doOutput
+ // request will be estimate bytes.
+
+ size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0;
+ size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0;
+ return request;
+}
+
+size_t pad(size_t value) { return value + value/2; }
+
+void WriteEstimate::delivered(size_t sent, size_t buffered) {
+ size_t wrote = sent > buffered ? sent - buffered : 0;
+ if (wrote == 0) // No change
+ return;
+ if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity.
+ growing = false;
+ estimate = pad(wrote); // Estimate at 1.5 write for padding.
+ }
+ else if (wrote > estimate) { // Wrote everything, buffer was under-stocked
+ if (growing)
+ estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock.
+ else
+ estimate = pad(wrote);
+ }
+}
+
+}} // namespace qpid::cluster
+
+
diff --git a/cpp/src/qpid/cluster/WriteEstimate.h b/cpp/src/qpid/cluster/WriteEstimate.h
new file mode 100644
index 0000000000..01ab2a3e34
--- /dev/null
+++ b/cpp/src/qpid/cluster/WriteEstimate.h
@@ -0,0 +1,64 @@
+#ifndef QPID_CLUSTER_WRITEESTIMATE_H
+#define QPID_CLUSTER_WRITEESTIMATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Estimate the amount of data that a connection can write between sending
+ * a doOutput notice and re-receiving it.
+ *
+ * The goal is to avoid ever write-idling the connection by sending
+ * the next doOutput request as soon as we've processed the previous
+ * one, such that data generated by the previous request will keep the
+ * writer busy till the next one is delivered.
+ *
+ */
+class WriteEstimate
+{
+ public:
+ WriteEstimate(size_t initial=4096);
+
+ /** About to send a doOutput request.
+ * Update estimation state and return size for next request.
+ */
+ size_t sending(size_t buffered);
+
+ /**
+ * doOutput request just delivered, not yet executed. Update the estimate.
+ * and estimate how much data to request in the next onOutput
+ * request. 0 means don't send an onOutput request.
+ */
+ void delivered(size_t sent, size_t buffered);
+
+ private:
+ bool growing;
+ size_t estimate;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/