diff options
author | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
commit | 2b97a69197fb986c209339c48ed98bb45203e107 (patch) | |
tree | 8bd157cc9d19757b6d9c00c5ab2c353ca336f8bf /cpp/src/qpid | |
parent | c6c237e2450250a6ef18c5af93e2a733aba10932 (diff) | |
download | qpid-python-2b97a69197fb986c209339c48ed98bb45203e107.tar.gz |
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.h | 28 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 106 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.h | 74 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/WriteEstimate.cpp | 63 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/WriteEstimate.h | 64 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h | 54 |
13 files changed, 434 insertions, 72 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index a3692911b2..0b996dedd2 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -30,7 +30,7 @@ using sys::Mutex; Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)), - identifier(id), initialized(false), isClient(_isClient) {} + identifier(id), initialized(false), isClient(_isClient), buffered(0) {} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); @@ -53,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) { bool Connection::canEncode() { if (!frameQueueClosed) connection->doOutput(); - Mutex::ScopedLock l(frameQueueLock); + Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -71,10 +71,12 @@ size_t Connection::encode(const char* buffer, size_t size) { initialized = true; QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")"); } - while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { + size_t frameSize=0; + while (!frameQueue.empty() && ((frameSize=frameQueue.front().size()) <= out.available())) { frameQueue.front().encode(out); QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); - frameQueue.pop(); + frameQueue.pop_front(); + buffered -= frameSize; } assert(frameQueue.empty() || frameQueue.front().size() <= size); if (!frameQueue.empty() && frameQueue.front().size() > size) @@ -98,7 +100,8 @@ void Connection::send(framing::AMQFrame& f) { { Mutex::ScopedLock l(frameQueueLock); if (!frameQueueClosed) - frameQueue.push(f); + frameQueue.push_back(f); + buffered += f.size(); } activateOutput(); } @@ -107,4 +110,9 @@ framing::ProtocolVersion Connection::getVersion() const { return framing::ProtocolVersion(0,10); } +size_t Connection::getBuffered() const { + Mutex::ScopedLock l(frameQueueLock); + return buffered; +} + }} // namespace qpid::amqp_0_10 diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h index b707031789..f6fb87f928 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.h +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -26,7 +26,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/broker/Connection.h" #include <boost/intrusive_ptr.hpp> -#include <queue> +#include <deque> #include <memory> namespace qpid { @@ -36,7 +36,9 @@ namespace amqp_0_10 { class Connection : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { - std::queue<framing::AMQFrame> frameQueue; + typedef std::deque<framing::AMQFrame> FrameQueue; + + FrameQueue frameQueue; bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; @@ -44,7 +46,8 @@ class Connection : public sys::ConnectionCodec, std::string identifier; bool initialized; bool isClient; - + size_t buffered; + public: Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false); size_t decode(const char* buffer, size_t size); @@ -56,6 +59,7 @@ class Connection : public sys::ConnectionCodec, void close(); // closing from this end. void send(framing::AMQFrame&); framing::ProtocolVersion getVersion() const; + size_t getBuffered() const; }; }} // namespace qpid::amqp_0_10 diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ab18d1f035..d65dbaeec7 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -79,7 +79,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std void Connection::requestIOProcessing(boost::function0<void> callback) { ioCallback = callback; - out->activateOutput(); + out.activateOutput(); } Connection::~Connection() @@ -178,7 +178,6 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); - // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10. while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -245,7 +244,7 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) case management::Connection::METHOD_CLOSE : mgmtClosing = true; if (mgmtObject != 0) mgmtObject->set_closing(1); - out->activateOutput(); + out.activateOutput(); status = Manageable::STATUS_OK; break; } diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index c9cf6ece8d..97055f8b2e 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -24,7 +24,7 @@ #include <vector> #include "qpid/sys/AggregateOutput.h" -#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/ConnectionOutputHandlerPtr.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/management/Manageable.h" #include "Broker.h" @@ -34,11 +34,14 @@ namespace broker { class ConnectionState : public ConnectionToken, public management::Manageable { + protected: + sys::ConnectionOutputHandlerPtr out; + public: - ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : + ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : + out(o), broker(b), - outputTasks(*o), - out(o), + outputTasks(out), framemax(65535), heartbeat(0), stagingThreshold(broker.getStagingThreshold()) @@ -67,14 +70,13 @@ class ConnectionState : public ConnectionToken, public management::Manageable //contained output tasks sys::AggregateOutput outputTasks; - sys::ConnectionOutputHandler& getOutput() const { return *out; } + sys::ConnectionOutputHandlerPtr& getOutput() { return out; } framing::ProtocolVersion getVersion() const { return version; } - void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; } + void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); } protected: framing::ProtocolVersion version; - sys::ConnectionOutputHandler* out; uint32_t framemax; uint16_t heartbeat; uint64_t stagingThreshold; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 05ab9148b5..75ce19e835 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -25,12 +25,14 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/framing/ClusterConnectionCloseBody.h" +#include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include <boost/bind.hpp> #include <boost/cast.hpp> +#include <boost/current_function.hpp> #include <algorithm> #include <iterator> #include <map> @@ -76,11 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg.join(name); notify(); - // FIXME aconway 2008-08-11: can we remove this loop? - // Dispatch till we show up in the cluster map. - while (empty()) - cpg.dispatchOne(); - // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); deliverQueue.start(poller); @@ -97,9 +94,8 @@ Cluster::~Cluster() { std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1)); } -// local connection initializes plugins void Cluster::initialize(broker::Connection& c) { - bool isLocal = &c.getOutput() != &shadowOut; + bool isLocal = c.getOutput().get() != &shadowOut; if (isLocal) localConnectionSet.insert(new ConnectionInterceptor(c, *this)); } @@ -107,10 +103,8 @@ void Cluster::initialize(broker::Connection& c) { void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. - // At this point the poller has already been shut down so - // no dispatches can occur thru the cpgDispatchHandle. - // - // FIXME aconway 2008-08-11: assert this is the cae. + // Leave is called by from Broker destructor after the poller has + // been shut down. No dispatches can occur. QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); @@ -173,13 +167,6 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -// ################ HERE - leaking shadow connections. -// FIXME aconway 2008-08-11: revisit memory management for shadow -// connections, what if the Connection is closed other than via -// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow -// map, add deleted state to ConnectionInterceptor? Interceptors need -// to know about map? Check how Connections can be deleted. - ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { ShadowConnectionId id(member, remotePtr); ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); @@ -274,7 +261,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod break; } case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: { - connection->deliverDoOutput(); + ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method); + connection->deliverDoOutput(doOutput.getBytes()); break; } default: @@ -309,9 +297,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) { void Cluster::disconnect(sys::DispatchHandle& h) { h.stopWatch(); - // FIXME aconway 2008-08-11: error handling if we are disconnected. - // Kill the broker? - assert(0); + QPID_LOG(critical, "Disconnected from cluster, shutting down"); + broker.shutdown(); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp index c13651eccb..efcab1b731 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -22,6 +22,8 @@ #include "qpid/framing/ClusterConnectionCloseBody.h" #include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/framing/AMQFrame.h" +#include <boost/current_function.hpp> + namespace qpid { namespace cluster { @@ -32,24 +34,27 @@ template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; ConnectionInterceptor::ConnectionInterceptor( broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) - : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_) + : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get()) { connection->addFinalizer(boost::bind(operator delete, this)); + connection->setOutputHandler(&output), // Attach my functions to Connection extension points. shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); - shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this)); + shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output)); } ConnectionInterceptor::~ConnectionInterceptor() { assert(connection == 0); } +// Forward all received frames to the cluster, continue handling on delivery. void ConnectionInterceptor::received(framing::AMQFrame& f) { if (isClosed) return; cluster.send(f, this); } +// Continue normal handling of delivered frames. void ConnectionInterceptor::deliver(framing::AMQFrame& f) { receivedNext(f); } @@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() { } void ConnectionInterceptor::dirtyClose() { - // Not closed via cluster self-delivery but closed locally. - // Used for dirty cluster shutdown where active connections - // must be cleaned up. + // Not closed via cluster self-delivery but closed locally. Used + // when local broker is shut down without a clean cluster shutdown. + // Release the connection, it will delete this. connection = 0; } -bool ConnectionInterceptor::doOutput() { - // FIXME aconway 2008-08-15: this is not correct. - // Run in write threads so order of execution of doOutput is not determinate. - // Will only work reliably for in single-consumer tests. - - if (connection->hasOutput()) { - cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this); - return doOutputNext(); - } - return false; -} - -void ConnectionInterceptor::deliverDoOutput() { - // FIXME aconway 2008-08-15: see comment in doOutput. - if (isShadow()) - doOutputNext(); +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void ConnectionInterceptor::deliverDoOutput(size_t requested) { + output.deliverDoOutput(requested); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h index 370572bd9d..9216921067 100644 --- a/cpp/src/qpid/cluster/ConnectionInterceptor.h +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H -#define QPID_CLUSTER_CONNECTIONPLUGIN_H +#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H +#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H /* * @@ -23,6 +23,8 @@ */ #include "Cluster.h" +#include "WriteEstimate.h" +#include "OutputInterceptor.h" #include "qpid/broker/Connection.h" #include "qpid/sys/ConnectionOutputHandler.h" @@ -41,42 +43,46 @@ class ConnectionInterceptor { Cluster::ShadowConnectionId getShadowId() const { return shadowId; } - bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); } - + bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); } + bool isLocal() const { return !isShadow(); } + bool getClosed() const { return isClosed; } + // self-delivery of intercepted extension points. void deliver(framing::AMQFrame& f); void deliverClosed(); - void deliverDoOutput(); + void deliverDoOutput(size_t requested); void dirtyClose(); + Cluster& getCluster() { return cluster; } + private: struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler { void close() {} void send(framing::AMQFrame&) {} - void doOutput() {} void activateOutput() {} }; - bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); } - // Functions to intercept to Connection extension points. void received(framing::AMQFrame&); void closed(); bool doOutput(); + void activateOutput(); + + void sendDoOutput(); boost::function<void (framing::AMQFrame&)> receivedNext; boost::function<void ()> closedNext; - boost::function<bool ()> doOutputNext; boost::intrusive_ptr<broker::Connection> connection; Cluster& cluster; NullConnectionHandler discardHandler; bool isClosed; Cluster::ShadowConnectionId shadowId; + WriteEstimate writeEstimate; + OutputInterceptor output; }; }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/ - +#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/ diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp new file mode 100644 index 0000000000..84d3a6ad69 --- /dev/null +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "OutputInterceptor.h" +#include "ConnectionInterceptor.h" +#include "qpid/framing/ClusterConnectionDoOutputBody.h" +#include "qpid/framing/AMQFrame.h" +#include <boost/current_function.hpp> + + +namespace qpid { +namespace cluster { + +using namespace framing; + +OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h) + : parent(p), next(h), sent(), moreOutput(), doingOutput() +{} + +void OutputInterceptor::send(framing::AMQFrame& f) { + Locker l(lock); + next.send(f); + sent += f.size(); +} + +void OutputInterceptor::activateOutput() { + Locker l(lock); + moreOutput = true; + sendDoOutput(); +} + +// Called in write thread when the IO layer has no more data to write. +// We do nothing in the write thread, we run doOutput only on delivery +// of doOutput requests. +bool OutputInterceptor::doOutput() { + return false; +} + +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void OutputInterceptor::deliverDoOutput(size_t requested) { + if (parent.getClosed()) return; + + Locker l(lock); + size_t buf = next.getBuffered(); + if (parent.isLocal()) + writeEstimate.delivered(sent, buf); // Update the estimate. + + // Run the real doOutput() till we have added the requested data or there's nothing to output. + sent = 0; + do { + sys::Mutex::ScopedUnlock u(lock); + moreOutput = doOutputNext(); // Calls send() + } while (sent < requested && moreOutput); + sent += buf; // Include buffered data in the sent total. + + QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput); + + if (parent.isLocal() && moreOutput) + sendDoOutput(); + else + doingOutput = false; +} + +void OutputInterceptor::startDoOutput() { + if (!doingOutput) + sendDoOutput(); +} + +// Send a doOutput request if one is not already in flight. +void OutputInterceptor::sendDoOutput() { + // Call with lock held. + if (parent.isShadow() || parent.getClosed()) + return; + + doingOutput = true; + size_t request = writeEstimate.sending(getBuffered()); + + // Note we may send 0 size request if there's more than 2*estimate in the buffer. + // Send it anyway to keep the doOutput chain going until we are sure there's no more output + // (in deliverDoOutput) + // + parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>( + framing::ProtocolVersion(), request)), &parent); + QPID_LOG(trace, &parent << "Send doOutput request for " << request); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h new file mode 100644 index 0000000000..b39f2a2be9 --- /dev/null +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -0,0 +1,74 @@ +#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H +#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "WriteEstimate.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/broker/ConnectionFactory.h" +#include <boost/function.hpp> + +namespace qpid { +namespace framing { class AMQFrame; } +namespace cluster { + +class ConnectionInterceptor; + +/** + * Interceptor for connection OutputHandler, manages outgoing message replication. + */ +class OutputInterceptor : public sys::ConnectionOutputHandler { + public: + OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h); + + // sys::ConnectionOutputHandler functions + void send(framing::AMQFrame& f); + void activateOutput(); + void close() { Locker l(lock); next.close(); } + size_t getBuffered() const { Locker l(lock); return next.getBuffered(); } + + // Delivery point for doOutput requests. + void deliverDoOutput(size_t requested); + // Intercept doOutput requests on Connection. + bool doOutput(); + + boost::function<bool ()> doOutputNext; + + ConnectionInterceptor& parent; + + private: + typedef sys::Mutex::ScopedLock Locker; + + void startDoOutput(); + void sendDoOutput(); + + mutable sys::Mutex lock; + sys::ConnectionOutputHandler& next; + size_t sent; + WriteEstimate writeEstimate; + bool moreOutput; + bool doingOutput; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/ diff --git a/cpp/src/qpid/cluster/WriteEstimate.cpp b/cpp/src/qpid/cluster/WriteEstimate.cpp new file mode 100644 index 0000000000..81131be437 --- /dev/null +++ b/cpp/src/qpid/cluster/WriteEstimate.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "WriteEstimate.h" +#include "qpid/log/Statement.h" +#include <boost/current_function.hpp> + +namespace qpid { +namespace cluster { + +WriteEstimate::WriteEstimate(size_t initial) + : growing(true), estimate(initial) {} + +size_t WriteEstimate::sending(size_t buffered) { + // We want to send a doOutput request for enough data such + // that if estimate bytes are written before it is self + // delivered then what is left in the buffer plus the doOutput + // request will be estimate bytes. + + size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0; + size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0; + return request; +} + +size_t pad(size_t value) { return value + value/2; } + +void WriteEstimate::delivered(size_t sent, size_t buffered) { + size_t wrote = sent > buffered ? sent - buffered : 0; + if (wrote == 0) // No change + return; + if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity. + growing = false; + estimate = pad(wrote); // Estimate at 1.5 write for padding. + } + else if (wrote > estimate) { // Wrote everything, buffer was under-stocked + if (growing) + estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock. + else + estimate = pad(wrote); + } +} + +}} // namespace qpid::cluster + + diff --git a/cpp/src/qpid/cluster/WriteEstimate.h b/cpp/src/qpid/cluster/WriteEstimate.h new file mode 100644 index 0000000000..01ab2a3e34 --- /dev/null +++ b/cpp/src/qpid/cluster/WriteEstimate.h @@ -0,0 +1,64 @@ +#ifndef QPID_CLUSTER_WRITEESTIMATE_H +#define QPID_CLUSTER_WRITEESTIMATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace cluster { + +/** + * Estimate the amount of data that a connection can write between sending + * a doOutput notice and re-receiving it. + * + * The goal is to avoid ever write-idling the connection by sending + * the next doOutput request as soon as we've processed the previous + * one, such that data generated by the previous request will keep the + * writer busy till the next one is delivered. + * + */ +class WriteEstimate +{ + public: + WriteEstimate(size_t initial=4096); + + /** About to send a doOutput request. + * Update estimation state and return size for next request. + */ + size_t sending(size_t buffered); + + /** + * doOutput request just delivered, not yet executed. Update the estimate. + * and estimate how much data to request in the next onOutput + * request. 0 means don't send an onOutput request. + */ + void delivered(size_t sent, size_t buffered); + + private: + bool growing; + size_t estimate; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/ diff --git a/cpp/src/qpid/sys/ConnectionOutputHandler.h b/cpp/src/qpid/sys/ConnectionOutputHandler.h index 5a60ae4998..de0bef3630 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -34,6 +34,7 @@ class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, pub { public: virtual void close() = 0; + virtual size_t getBuffered() const { return 0; } }; }} diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h new file mode 100644 index 0000000000..c2da8edc2c --- /dev/null +++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -0,0 +1,54 @@ +#ifndef QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H +#define QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionOutputHandler.h" + +namespace qpid { +namespace sys { + +/** + * A ConnectionOutputHandler that delegates to another + * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler + * to be changed modified without updating all the pointers/references + * using the ConnectionOutputHandlerPtr + */ +class ConnectionOutputHandlerPtr : public ConnectionOutputHandler +{ + public: + ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {} + void set(ConnectionOutputHandler* p) { next = p; } + ConnectionOutputHandler* get() { return next; } + const ConnectionOutputHandler* get() const { return next; } + + void close() { next->close(); } + size_t getBuffered() const { return next->getBuffered(); } + void activateOutput() { next->activateOutput(); } + void send(framing::AMQFrame& f) { next->send(f); } + + private: + ConnectionOutputHandler* next; +}; +}} // namespace qpid::sys + +#endif /*!QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H*/ |