diff options
author | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
commit | 2b97a69197fb986c209339c48ed98bb45203e107 (patch) | |
tree | 8bd157cc9d19757b6d9c00c5ab2c353ca336f8bf /cpp/src/qpid/cluster | |
parent | c6c237e2450250a6ef18c5af93e2a733aba10932 (diff) | |
download | qpid-python-2b97a69197fb986c209339c48ed98bb45203e107.tar.gz |
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.h | 28 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 106 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.h | 74 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/WriteEstimate.cpp | 63 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/WriteEstimate.h | 64 |
7 files changed, 348 insertions, 54 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 05ab9148b5..75ce19e835 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -25,12 +25,14 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/framing/ClusterConnectionCloseBody.h" +#include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include <boost/bind.hpp> #include <boost/cast.hpp> +#include <boost/current_function.hpp> #include <algorithm> #include <iterator> #include <map> @@ -76,11 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg.join(name); notify(); - // FIXME aconway 2008-08-11: can we remove this loop? - // Dispatch till we show up in the cluster map. - while (empty()) - cpg.dispatchOne(); - // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); deliverQueue.start(poller); @@ -97,9 +94,8 @@ Cluster::~Cluster() { std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1)); } -// local connection initializes plugins void Cluster::initialize(broker::Connection& c) { - bool isLocal = &c.getOutput() != &shadowOut; + bool isLocal = c.getOutput().get() != &shadowOut; if (isLocal) localConnectionSet.insert(new ConnectionInterceptor(c, *this)); } @@ -107,10 +103,8 @@ void Cluster::initialize(broker::Connection& c) { void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. - // At this point the poller has already been shut down so - // no dispatches can occur thru the cpgDispatchHandle. - // - // FIXME aconway 2008-08-11: assert this is the cae. + // Leave is called by from Broker destructor after the poller has + // been shut down. No dispatches can occur. QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); @@ -173,13 +167,6 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -// ################ HERE - leaking shadow connections. -// FIXME aconway 2008-08-11: revisit memory management for shadow -// connections, what if the Connection is closed other than via -// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow -// map, add deleted state to ConnectionInterceptor? Interceptors need -// to know about map? Check how Connections can be deleted. - ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { ShadowConnectionId id(member, remotePtr); ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); @@ -274,7 +261,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod break; } case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: { - connection->deliverDoOutput(); + ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method); + connection->deliverDoOutput(doOutput.getBytes()); break; } default: @@ -309,9 +297,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) { void Cluster::disconnect(sys::DispatchHandle& h) { h.stopWatch(); - // FIXME aconway 2008-08-11: error handling if we are disconnected. - // Kill the broker? - assert(0); + QPID_LOG(critical, "Disconnected from cluster, shutting down"); + broker.shutdown(); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp index c13651eccb..efcab1b731 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -22,6 +22,8 @@ #include "qpid/framing/ClusterConnectionCloseBody.h" #include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/framing/AMQFrame.h" +#include <boost/current_function.hpp> + namespace qpid { namespace cluster { @@ -32,24 +34,27 @@ template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; ConnectionInterceptor::ConnectionInterceptor( broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) - : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_) + : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get()) { connection->addFinalizer(boost::bind(operator delete, this)); + connection->setOutputHandler(&output), // Attach my functions to Connection extension points. shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); - shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this)); + shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output)); } ConnectionInterceptor::~ConnectionInterceptor() { assert(connection == 0); } +// Forward all received frames to the cluster, continue handling on delivery. void ConnectionInterceptor::received(framing::AMQFrame& f) { if (isClosed) return; cluster.send(f, this); } +// Continue normal handling of delivered frames. void ConnectionInterceptor::deliver(framing::AMQFrame& f) { receivedNext(f); } @@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() { } void ConnectionInterceptor::dirtyClose() { - // Not closed via cluster self-delivery but closed locally. - // Used for dirty cluster shutdown where active connections - // must be cleaned up. + // Not closed via cluster self-delivery but closed locally. Used + // when local broker is shut down without a clean cluster shutdown. + // Release the connection, it will delete this. connection = 0; } -bool ConnectionInterceptor::doOutput() { - // FIXME aconway 2008-08-15: this is not correct. - // Run in write threads so order of execution of doOutput is not determinate. - // Will only work reliably for in single-consumer tests. - - if (connection->hasOutput()) { - cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this); - return doOutputNext(); - } - return false; -} - -void ConnectionInterceptor::deliverDoOutput() { - // FIXME aconway 2008-08-15: see comment in doOutput. - if (isShadow()) - doOutputNext(); +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void ConnectionInterceptor::deliverDoOutput(size_t requested) { + output.deliverDoOutput(requested); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h index 370572bd9d..9216921067 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.h +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H -#define QPID_CLUSTER_CONNECTIONPLUGIN_H +#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H +#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H /* * @@ -23,6 +23,8 @@ */ #include "Cluster.h" +#include "WriteEstimate.h" +#include "OutputInterceptor.h" #include "qpid/broker/Connection.h" #include "qpid/sys/ConnectionOutputHandler.h" @@ -41,42 +43,46 @@ class ConnectionInterceptor { Cluster::ShadowConnectionId getShadowId() const { return shadowId; } - bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); } - + bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); } + bool isLocal() const { return !isShadow(); } + bool getClosed() const { return isClosed; } + // self-delivery of intercepted extension points. void deliver(framing::AMQFrame& f); void deliverClosed(); - void deliverDoOutput(); + void deliverDoOutput(size_t requested); void dirtyClose(); + Cluster& getCluster() { return cluster; } + private: struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler { void close() {} void send(framing::AMQFrame&) {} - void doOutput() {} void activateOutput() {} }; - bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); } - // Functions to intercept to Connection extension points. void received(framing::AMQFrame&); void closed(); bool doOutput(); + void activateOutput(); + + void sendDoOutput(); boost::function<void (framing::AMQFrame&)> receivedNext; boost::function<void ()> closedNext; - boost::function<bool ()> doOutputNext; boost::intrusive_ptr<broker::Connection> connection; Cluster& cluster; NullConnectionHandler discardHandler; bool isClosed; Cluster::ShadowConnectionId shadowId; + WriteEstimate writeEstimate; + OutputInterceptor output; }; }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/ - +#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/ diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp new file mode 100644 index 0000000000..84d3a6ad69 --- /dev/null +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "OutputInterceptor.h" +#include "ConnectionInterceptor.h" +#include "qpid/framing/ClusterConnectionDoOutputBody.h" +#include "qpid/framing/AMQFrame.h" +#include <boost/current_function.hpp> + + +namespace qpid { +namespace cluster { + +using namespace framing; + +OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h) + : parent(p), next(h), sent(), moreOutput(), doingOutput() +{} + +void OutputInterceptor::send(framing::AMQFrame& f) { + Locker l(lock); + next.send(f); + sent += f.size(); +} + +void OutputInterceptor::activateOutput() { + Locker l(lock); + moreOutput = true; + sendDoOutput(); +} + +// Called in write thread when the IO layer has no more data to write. +// We do nothing in the write thread, we run doOutput only on delivery +// of doOutput requests. +bool OutputInterceptor::doOutput() { + return false; +} + +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void OutputInterceptor::deliverDoOutput(size_t requested) { + if (parent.getClosed()) return; + + Locker l(lock); + size_t buf = next.getBuffered(); + if (parent.isLocal()) + writeEstimate.delivered(sent, buf); // Update the estimate. + + // Run the real doOutput() till we have added the requested data or there's nothing to output. + sent = 0; + do { + sys::Mutex::ScopedUnlock u(lock); + moreOutput = doOutputNext(); // Calls send() + } while (sent < requested && moreOutput); + sent += buf; // Include buffered data in the sent total. + + QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput); + + if (parent.isLocal() && moreOutput) + sendDoOutput(); + else + doingOutput = false; +} + +void OutputInterceptor::startDoOutput() { + if (!doingOutput) + sendDoOutput(); +} + +// Send a doOutput request if one is not already in flight. +void OutputInterceptor::sendDoOutput() { + // Call with lock held. + if (parent.isShadow() || parent.getClosed()) + return; + + doingOutput = true; + size_t request = writeEstimate.sending(getBuffered()); + + // Note we may send 0 size request if there's more than 2*estimate in the buffer. + // Send it anyway to keep the doOutput chain going until we are sure there's no more output + // (in deliverDoOutput) + // + parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>( + framing::ProtocolVersion(), request)), &parent); + QPID_LOG(trace, &parent << "Send doOutput request for " << request); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h new file mode 100644 index 0000000000..b39f2a2be9 --- /dev/null +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -0,0 +1,74 @@ +#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H +#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "WriteEstimate.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/broker/ConnectionFactory.h" +#include <boost/function.hpp> + +namespace qpid { +namespace framing { class AMQFrame; } +namespace cluster { + +class ConnectionInterceptor; + +/** + * Interceptor for connection OutputHandler, manages outgoing message replication. + */ +class OutputInterceptor : public sys::ConnectionOutputHandler { + public: + OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h); + + // sys::ConnectionOutputHandler functions + void send(framing::AMQFrame& f); + void activateOutput(); + void close() { Locker l(lock); next.close(); } + size_t getBuffered() const { Locker l(lock); return next.getBuffered(); } + + // Delivery point for doOutput requests. + void deliverDoOutput(size_t requested); + // Intercept doOutput requests on Connection. + bool doOutput(); + + boost::function<bool ()> doOutputNext; + + ConnectionInterceptor& parent; + + private: + typedef sys::Mutex::ScopedLock Locker; + + void startDoOutput(); + void sendDoOutput(); + + mutable sys::Mutex lock; + sys::ConnectionOutputHandler& next; + size_t sent; + WriteEstimate writeEstimate; + bool moreOutput; + bool doingOutput; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/ diff --git a/cpp/src/qpid/cluster/WriteEstimate.cpp b/cpp/src/qpid/cluster/WriteEstimate.cpp new file mode 100644 index 0000000000..81131be437 --- /dev/null +++ b/cpp/src/qpid/cluster/WriteEstimate.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "WriteEstimate.h" +#include "qpid/log/Statement.h" +#include <boost/current_function.hpp> + +namespace qpid { +namespace cluster { + +WriteEstimate::WriteEstimate(size_t initial) + : growing(true), estimate(initial) {} + +size_t WriteEstimate::sending(size_t buffered) { + // We want to send a doOutput request for enough data such + // that if estimate bytes are written before it is self + // delivered then what is left in the buffer plus the doOutput + // request will be estimate bytes. + + size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0; + size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0; + return request; +} + +size_t pad(size_t value) { return value + value/2; } + +void WriteEstimate::delivered(size_t sent, size_t buffered) { + size_t wrote = sent > buffered ? sent - buffered : 0; + if (wrote == 0) // No change + return; + if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity. + growing = false; + estimate = pad(wrote); // Estimate at 1.5 write for padding. + } + else if (wrote > estimate) { // Wrote everything, buffer was under-stocked + if (growing) + estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock. + else + estimate = pad(wrote); + } +} + +}} // namespace qpid::cluster + + diff --git a/cpp/src/qpid/cluster/WriteEstimate.h b/cpp/src/qpid/cluster/WriteEstimate.h new file mode 100644 index 0000000000..01ab2a3e34 --- /dev/null +++ b/cpp/src/qpid/cluster/WriteEstimate.h @@ -0,0 +1,64 @@ +#ifndef QPID_CLUSTER_WRITEESTIMATE_H +#define QPID_CLUSTER_WRITEESTIMATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace cluster { + +/** + * Estimate the amount of data that a connection can write between sending + * a doOutput notice and re-receiving it. + * + * The goal is to avoid ever write-idling the connection by sending + * the next doOutput request as soon as we've processed the previous + * one, such that data generated by the previous request will keep the + * writer busy till the next one is delivered. + * + */ +class WriteEstimate +{ + public: + WriteEstimate(size_t initial=4096); + + /** About to send a doOutput request. + * Update estimation state and return size for next request. + */ + size_t sending(size_t buffered); + + /** + * doOutput request just delivered, not yet executed. Update the estimate. + * and estimate how much data to request in the next onOutput + * request. 0 means don't send an onOutput request. + */ + void delivered(size_t sent, size_t buffered); + + private: + bool growing; + size_t estimate; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/ |