diff options
author | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
commit | 299f256ef76ecbec842c98260bdc3c7671b0de91 (patch) | |
tree | 7068617975bea8b11eb76982f833bc20a2d76e4a | |
parent | 9802ade0355cf8d5f70bcaeea64aad86e6f8dc0a (diff) | |
download | qpid-python-299f256ef76ecbec842c98260bdc3c7671b0de91.tar.gz |
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@687813 13f79535-47bb-0310-9956-ffa450edef68
20 files changed, 466 insertions, 87 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 172888cfb8..039c35ed4e 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -546,6 +546,7 @@ nobase_include_HEADERS = \ qpid/sys/ConnectionInputHandler.h \ qpid/sys/ConnectionInputHandlerFactory.h \ qpid/sys/ConnectionOutputHandler.h \ + qpid/sys/ConnectionOutputHandlerPtr.h \ qpid/sys/DeletionManager.h \ qpid/sys/Dispatcher.h \ qpid/sys/IOHandle.h \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index aa3644785b..934ec0174b 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -19,7 +19,11 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/ShadowConnectionOutputHandler.h \ qpid/cluster/PollableCondition.h \ qpid/cluster/PollableCondition.cpp \ - qpid/cluster/PollableQueue.h + qpid/cluster/PollableQueue.h \ + qpid/cluster/WriteEstimate.h \ + qpid/cluster/WriteEstimate.cpp \ + qpid/cluster/OutputInterceptor.h \ + qpid/cluster/OutputInterceptor.cpp libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index a3692911b2..0b996dedd2 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -30,7 +30,7 @@ using sys::Mutex; Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)), - identifier(id), initialized(false), isClient(_isClient) {} + identifier(id), initialized(false), isClient(_isClient), buffered(0) {} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); @@ -53,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) { bool Connection::canEncode() { if (!frameQueueClosed) connection->doOutput(); - Mutex::ScopedLock l(frameQueueLock); + Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -71,10 +71,12 @@ size_t Connection::encode(const char* buffer, size_t size) { initialized = true; QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")"); } - while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { + size_t frameSize=0; + while (!frameQueue.empty() && ((frameSize=frameQueue.front().size()) <= out.available())) { frameQueue.front().encode(out); QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); - frameQueue.pop(); + frameQueue.pop_front(); + buffered -= frameSize; } assert(frameQueue.empty() || frameQueue.front().size() <= size); if (!frameQueue.empty() && frameQueue.front().size() > size) @@ -98,7 +100,8 @@ void Connection::send(framing::AMQFrame& f) { { Mutex::ScopedLock l(frameQueueLock); if (!frameQueueClosed) - frameQueue.push(f); + frameQueue.push_back(f); + buffered += f.size(); } activateOutput(); } @@ -107,4 +110,9 @@ framing::ProtocolVersion Connection::getVersion() const { return framing::ProtocolVersion(0,10); } +size_t Connection::getBuffered() const { + Mutex::ScopedLock l(frameQueueLock); + return buffered; +} + }} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index b707031789..f6fb87f928 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -26,7 +26,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/broker/Connection.h" #include <boost/intrusive_ptr.hpp> -#include <queue> +#include <deque> #include <memory> namespace qpid { @@ -36,7 +36,9 @@ namespace amqp_0_10 { class Connection : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { - std::queue<framing::AMQFrame> frameQueue; + typedef std::deque<framing::AMQFrame> FrameQueue; + + FrameQueue frameQueue; bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; @@ -44,7 +46,8 @@ class Connection : public sys::ConnectionCodec, std::string identifier; bool initialized; bool isClient; - + size_t buffered; + public: Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false); size_t decode(const char* buffer, size_t size); @@ -56,6 +59,7 @@ class Connection : public sys::ConnectionCodec, void close(); // closing from this end. void send(framing::AMQFrame&); framing::ProtocolVersion getVersion() const; + size_t getBuffered() const; }; }} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index ab18d1f035..d65dbaeec7 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -79,7 +79,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std void Connection::requestIOProcessing(boost::function0<void> callback) { ioCallback = callback; - out->activateOutput(); + out.activateOutput(); } Connection::~Connection() @@ -178,7 +178,6 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); - // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10. while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -245,7 +244,7 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) case management::Connection::METHOD_CLOSE : mgmtClosing = true; if (mgmtObject != 0) mgmtObject->set_closing(1); - out->activateOutput(); + out.activateOutput(); status = Manageable::STATUS_OK; break; } diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h index c9cf6ece8d..97055f8b2e 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionState.h +++ b/qpid/cpp/src/qpid/broker/ConnectionState.h @@ -24,7 +24,7 @@ #include <vector> #include "qpid/sys/AggregateOutput.h" -#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/ConnectionOutputHandlerPtr.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/management/Manageable.h" #include "Broker.h" @@ -34,11 +34,14 @@ namespace broker { class ConnectionState : public ConnectionToken, public management::Manageable { + protected: + sys::ConnectionOutputHandlerPtr out; + public: - ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : + ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : + out(o), broker(b), - outputTasks(*o), - out(o), + outputTasks(out), framemax(65535), heartbeat(0), stagingThreshold(broker.getStagingThreshold()) @@ -67,14 +70,13 @@ class ConnectionState : public ConnectionToken, public management::Manageable //contained output tasks sys::AggregateOutput outputTasks; - sys::ConnectionOutputHandler& getOutput() const { return *out; } + sys::ConnectionOutputHandlerPtr& getOutput() { return out; } framing::ProtocolVersion getVersion() const { return version; } - void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; } + void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); } protected: framing::ProtocolVersion version; - sys::ConnectionOutputHandler* out; uint32_t framemax; uint16_t heartbeat; uint64_t stagingThreshold; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 05ab9148b5..75ce19e835 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -25,12 +25,14 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/framing/ClusterConnectionCloseBody.h" +#include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include <boost/bind.hpp> #include <boost/cast.hpp> +#include <boost/current_function.hpp> #include <algorithm> #include <iterator> #include <map> @@ -76,11 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg.join(name); notify(); - // FIXME aconway 2008-08-11: can we remove this loop? - // Dispatch till we show up in the cluster map. - while (empty()) - cpg.dispatchOne(); - // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); deliverQueue.start(poller); @@ -97,9 +94,8 @@ Cluster::~Cluster() { std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1)); } -// local connection initializes plugins void Cluster::initialize(broker::Connection& c) { - bool isLocal = &c.getOutput() != &shadowOut; + bool isLocal = c.getOutput().get() != &shadowOut; if (isLocal) localConnectionSet.insert(new ConnectionInterceptor(c, *this)); } @@ -107,10 +103,8 @@ void Cluster::initialize(broker::Connection& c) { void Cluster::leave() { Mutex::ScopedLock l(lock); if (!broker) return; // Already left. - // At this point the poller has already been shut down so - // no dispatches can occur thru the cpgDispatchHandle. - // - // FIXME aconway 2008-08-11: assert this is the cae. + // Leave is called by from Broker destructor after the poller has + // been shut down. No dispatches can occur. QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); @@ -173,13 +167,6 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -// ################ HERE - leaking shadow connections. -// FIXME aconway 2008-08-11: revisit memory management for shadow -// connections, what if the Connection is closed other than via -// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow -// map, add deleted state to ConnectionInterceptor? Interceptors need -// to know about map? Check how Connections can be deleted. - ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { ShadowConnectionId id(member, remotePtr); ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); @@ -274,7 +261,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod break; } case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: { - connection->deliverDoOutput(); + ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method); + connection->deliverDoOutput(doOutput.getBytes()); break; } default: @@ -309,9 +297,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) { void Cluster::disconnect(sys::DispatchHandle& h) { h.stopWatch(); - // FIXME aconway 2008-08-11: error handling if we are disconnected. - // Kill the broker? - assert(0); + QPID_LOG(critical, "Disconnected from cluster, shutting down"); + broker.shutdown(); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp index c13651eccb..efcab1b731 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -22,6 +22,8 @@ #include "qpid/framing/ClusterConnectionCloseBody.h" #include "qpid/framing/ClusterConnectionDoOutputBody.h" #include "qpid/framing/AMQFrame.h" +#include <boost/current_function.hpp> + namespace qpid { namespace cluster { @@ -32,24 +34,27 @@ template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; ConnectionInterceptor::ConnectionInterceptor( broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) - : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_) + : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get()) { connection->addFinalizer(boost::bind(operator delete, this)); + connection->setOutputHandler(&output), // Attach my functions to Connection extension points. shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); - shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this)); + shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output)); } ConnectionInterceptor::~ConnectionInterceptor() { assert(connection == 0); } +// Forward all received frames to the cluster, continue handling on delivery. void ConnectionInterceptor::received(framing::AMQFrame& f) { if (isClosed) return; cluster.send(f, this); } +// Continue normal handling of delivered frames. void ConnectionInterceptor::deliver(framing::AMQFrame& f) { receivedNext(f); } @@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() { } void ConnectionInterceptor::dirtyClose() { - // Not closed via cluster self-delivery but closed locally. - // Used for dirty cluster shutdown where active connections - // must be cleaned up. + // Not closed via cluster self-delivery but closed locally. Used + // when local broker is shut down without a clean cluster shutdown. + // Release the connection, it will delete this. connection = 0; } -bool ConnectionInterceptor::doOutput() { - // FIXME aconway 2008-08-15: this is not correct. - // Run in write threads so order of execution of doOutput is not determinate. - // Will only work reliably for in single-consumer tests. - - if (connection->hasOutput()) { - cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this); - return doOutputNext(); - } - return false; -} - -void ConnectionInterceptor::deliverDoOutput() { - // FIXME aconway 2008-08-15: see comment in doOutput. - if (isShadow()) - doOutputNext(); +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void ConnectionInterceptor::deliverDoOutput(size_t requested) { + output.deliverDoOutput(requested); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h index 370572bd9d..9216921067 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H -#define QPID_CLUSTER_CONNECTIONPLUGIN_H +#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H +#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H /* * @@ -23,6 +23,8 @@ */ #include "Cluster.h" +#include "WriteEstimate.h" +#include "OutputInterceptor.h" #include "qpid/broker/Connection.h" #include "qpid/sys/ConnectionOutputHandler.h" @@ -41,42 +43,46 @@ class ConnectionInterceptor { Cluster::ShadowConnectionId getShadowId() const { return shadowId; } - bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); } - + bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); } + bool isLocal() const { return !isShadow(); } + bool getClosed() const { return isClosed; } + // self-delivery of intercepted extension points. void deliver(framing::AMQFrame& f); void deliverClosed(); - void deliverDoOutput(); + void deliverDoOutput(size_t requested); void dirtyClose(); + Cluster& getCluster() { return cluster; } + private: struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler { void close() {} void send(framing::AMQFrame&) {} - void doOutput() {} void activateOutput() {} }; - bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); } - // Functions to intercept to Connection extension points. void received(framing::AMQFrame&); void closed(); bool doOutput(); + void activateOutput(); + + void sendDoOutput(); boost::function<void (framing::AMQFrame&)> receivedNext; boost::function<void ()> closedNext; - boost::function<bool ()> doOutputNext; boost::intrusive_ptr<broker::Connection> connection; Cluster& cluster; NullConnectionHandler discardHandler; bool isClosed; Cluster::ShadowConnectionId shadowId; + WriteEstimate writeEstimate; + OutputInterceptor output; }; }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/ - +#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/ diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp new file mode 100644 index 0000000000..84d3a6ad69 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "OutputInterceptor.h" +#include "ConnectionInterceptor.h" +#include "qpid/framing/ClusterConnectionDoOutputBody.h" +#include "qpid/framing/AMQFrame.h" +#include <boost/current_function.hpp> + + +namespace qpid { +namespace cluster { + +using namespace framing; + +OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h) + : parent(p), next(h), sent(), moreOutput(), doingOutput() +{} + +void OutputInterceptor::send(framing::AMQFrame& f) { + Locker l(lock); + next.send(f); + sent += f.size(); +} + +void OutputInterceptor::activateOutput() { + Locker l(lock); + moreOutput = true; + sendDoOutput(); +} + +// Called in write thread when the IO layer has no more data to write. +// We do nothing in the write thread, we run doOutput only on delivery +// of doOutput requests. +bool OutputInterceptor::doOutput() { + return false; +} + +// Delivery of doOutput allows us to run the real connection doOutput() +// which stocks up the write buffers with data. +// +void OutputInterceptor::deliverDoOutput(size_t requested) { + if (parent.getClosed()) return; + + Locker l(lock); + size_t buf = next.getBuffered(); + if (parent.isLocal()) + writeEstimate.delivered(sent, buf); // Update the estimate. + + // Run the real doOutput() till we have added the requested data or there's nothing to output. + sent = 0; + do { + sys::Mutex::ScopedUnlock u(lock); + moreOutput = doOutputNext(); // Calls send() + } while (sent < requested && moreOutput); + sent += buf; // Include buffered data in the sent total. + + QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput); + + if (parent.isLocal() && moreOutput) + sendDoOutput(); + else + doingOutput = false; +} + +void OutputInterceptor::startDoOutput() { + if (!doingOutput) + sendDoOutput(); +} + +// Send a doOutput request if one is not already in flight. +void OutputInterceptor::sendDoOutput() { + // Call with lock held. + if (parent.isShadow() || parent.getClosed()) + return; + + doingOutput = true; + size_t request = writeEstimate.sending(getBuffered()); + + // Note we may send 0 size request if there's more than 2*estimate in the buffer. + // Send it anyway to keep the doOutput chain going until we are sure there's no more output + // (in deliverDoOutput) + // + parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>( + framing::ProtocolVersion(), request)), &parent); + QPID_LOG(trace, &parent << "Send doOutput request for " << request); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h new file mode 100644 index 0000000000..b39f2a2be9 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h @@ -0,0 +1,74 @@ +#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H +#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "WriteEstimate.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/broker/ConnectionFactory.h" +#include <boost/function.hpp> + +namespace qpid { +namespace framing { class AMQFrame; } +namespace cluster { + +class ConnectionInterceptor; + +/** + * Interceptor for connection OutputHandler, manages outgoing message replication. + */ +class OutputInterceptor : public sys::ConnectionOutputHandler { + public: + OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h); + + // sys::ConnectionOutputHandler functions + void send(framing::AMQFrame& f); + void activateOutput(); + void close() { Locker l(lock); next.close(); } + size_t getBuffered() const { Locker l(lock); return next.getBuffered(); } + + // Delivery point for doOutput requests. + void deliverDoOutput(size_t requested); + // Intercept doOutput requests on Connection. + bool doOutput(); + + boost::function<bool ()> doOutputNext; + + ConnectionInterceptor& parent; + + private: + typedef sys::Mutex::ScopedLock Locker; + + void startDoOutput(); + void sendDoOutput(); + + mutable sys::Mutex lock; + sys::ConnectionOutputHandler& next; + size_t sent; + WriteEstimate writeEstimate; + bool moreOutput; + bool doingOutput; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/ diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp new file mode 100644 index 0000000000..81131be437 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "WriteEstimate.h" +#include "qpid/log/Statement.h" +#include <boost/current_function.hpp> + +namespace qpid { +namespace cluster { + +WriteEstimate::WriteEstimate(size_t initial) + : growing(true), estimate(initial) {} + +size_t WriteEstimate::sending(size_t buffered) { + // We want to send a doOutput request for enough data such + // that if estimate bytes are written before it is self + // delivered then what is left in the buffer plus the doOutput + // request will be estimate bytes. + + size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0; + size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0; + return request; +} + +size_t pad(size_t value) { return value + value/2; } + +void WriteEstimate::delivered(size_t sent, size_t buffered) { + size_t wrote = sent > buffered ? sent - buffered : 0; + if (wrote == 0) // No change + return; + if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity. + growing = false; + estimate = pad(wrote); // Estimate at 1.5 write for padding. + } + else if (wrote > estimate) { // Wrote everything, buffer was under-stocked + if (growing) + estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock. + else + estimate = pad(wrote); + } +} + +}} // namespace qpid::cluster + + diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.h b/qpid/cpp/src/qpid/cluster/WriteEstimate.h new file mode 100644 index 0000000000..01ab2a3e34 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.h @@ -0,0 +1,64 @@ +#ifndef QPID_CLUSTER_WRITEESTIMATE_H +#define QPID_CLUSTER_WRITEESTIMATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace cluster { + +/** + * Estimate the amount of data that a connection can write between sending + * a doOutput notice and re-receiving it. + * + * The goal is to avoid ever write-idling the connection by sending + * the next doOutput request as soon as we've processed the previous + * one, such that data generated by the previous request will keep the + * writer busy till the next one is delivered. + * + */ +class WriteEstimate +{ + public: + WriteEstimate(size_t initial=4096); + + /** About to send a doOutput request. + * Update estimation state and return size for next request. + */ + size_t sending(size_t buffered); + + /** + * doOutput request just delivered, not yet executed. Update the estimate. + * and estimate how much data to request in the next onOutput + * request. 0 means don't send an onOutput request. + */ + void delivered(size_t sent, size_t buffered); + + private: + bool growing; + size_t estimate; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/ diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h index 5a60ae4998..de0bef3630 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -34,6 +34,7 @@ class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, pub { public: virtual void close() = 0; + virtual size_t getBuffered() const { return 0; } }; }} diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h new file mode 100644 index 0000000000..c2da8edc2c --- /dev/null +++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -0,0 +1,54 @@ +#ifndef QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H +#define QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionOutputHandler.h" + +namespace qpid { +namespace sys { + +/** + * A ConnectionOutputHandler that delegates to another + * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler + * to be changed modified without updating all the pointers/references + * using the ConnectionOutputHandlerPtr + */ +class ConnectionOutputHandlerPtr : public ConnectionOutputHandler +{ + public: + ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {} + void set(ConnectionOutputHandler* p) { next = p; } + ConnectionOutputHandler* get() { return next; } + const ConnectionOutputHandler* get() const { return next; } + + void close() { next->close(); } + size_t getBuffered() const { return next->getBuffered(); } + void activateOutput() { next->activateOutput(); } + void send(framing::AMQFrame& f) { next->send(f); } + + private: + ConnectionOutputHandler* next; +}; +}} // namespace qpid::sys + +#endif /*!QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H*/ diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 49c5264990..8f3927732d 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -78,7 +78,7 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { ::sleep(1); --retry; } - BOOST_CHECK_EQUAL(n, getGlobalCluster()->size()); + BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size()); } void ClusterFixture::add() { diff --git a/qpid/cpp/src/tests/consume.cpp b/qpid/cpp/src/tests/consume.cpp index fecf6bb315..c20a738755 100644 --- a/qpid/cpp/src/tests/consume.cpp +++ b/qpid/cpp/src/tests/consume.cpp @@ -42,13 +42,15 @@ struct Args : public qpid::TestOptions { uint count; uint ack; string queue; - + bool declare; + Args() : count(0), ack(1) { addOptions() ("count", optValue(count, "N"), "number of messages to publish") ("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)") - ("queue", optValue(queue, "<queue name>"), "queue to consume from"); + ("queue", optValue(queue, "<queue name>"), "queue to consume from") + ("declare", optValue(declare), "declare the queue"); } }; @@ -67,7 +69,8 @@ struct Client void consume() { - + if (opts.declare) + session.queueDeclare(opts.queue); SubscriptionManager subs(session); LocalQueue lq(AckPolicy(opts.ack)); subs.setAcceptMode(opts.ack > 0 ? 0 : 1); @@ -77,7 +80,7 @@ struct Client Message msg; for (size_t i = 0; i < opts.count; ++i) { msg=lq.pop(); - std::cout << "Received: " << msg.getMessageProperties().getCorrelationId() << std::endl; + QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId()); } if (opts.ack != 0) subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp index e1a6f156a5..6c3fdd23bd 100644 --- a/qpid/cpp/src/tests/latencytest.cpp +++ b/qpid/cpp/src/tests/latencytest.cpp @@ -44,6 +44,7 @@ struct Args : public qpid::TestOptions { uint size; uint count; uint rate; + bool sync; uint reportFrequency; uint timeLimit; uint queues; @@ -65,6 +66,7 @@ struct Args : public qpid::TestOptions { ("queues", optValue(queues, "N"), "number of queues") ("count", optValue(count, "N"), "number of messages to send") ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") + ("sync", optValue(sync), "send messages synchronously") ("report-frequency", optValue(reportFrequency, "N"), "number of milliseconds to wait between reports (ignored unless rate specified)") ("time-limit", optValue(timeLimit, "N"), @@ -143,6 +145,7 @@ class Sender : public Client void sendByRate(); void sendByCount(); Receiver& receiver; + const string data; public: Sender(const string& queue, Receiver& receiver); void test(); @@ -285,7 +288,7 @@ void Stats::reset() totalLatency = maxLatency = minLatency = 0; } -Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {} +Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {} void Sender::test() { @@ -295,7 +298,7 @@ void Sender::test() void Sender::sendByCount() { - Message msg(generateData(opts.size), queue); + Message msg(data, queue); if (opts.durable) { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } @@ -303,15 +306,15 @@ void Sender::sendByCount() for (uint i = 0; i < opts.count; i++) { uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); - //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); + if (opts.sync) session.sync(); } session.sync(); } void Sender::sendByRate() { - Message msg(generateData(opts.size), queue); + Message msg(data, queue); if (opts.durable) { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } @@ -324,9 +327,9 @@ void Sender::sendByRate() while (true) { uint64_t start_msg(current_time()); msg.getDeliveryProperties().setTimestamp(start_msg); - //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); - + if (opts.sync) session.sync(); + uint64_t now = current_time(); if (timeLimit != 0 && (now - start) > timeLimit) { diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster index 55f989a3e9..6d254190df 100755 --- a/qpid/cpp/src/tests/start_cluster +++ b/qpid/cpp/src/tests/start_cluster @@ -16,7 +16,7 @@ OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no- if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port. ../qpidd -q - with_ais_group ../qpidd $OPTS || exit 1 + with_ais_group ../qpidd $OPTS --log-output=cluster.log || exit 1 else for (( i=0; i<SIZE; ++i )); do PORT=`with_ais_group ../qpidd -p0 --log-output=cluster$i.log $OPTS` || exit 1 diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 6dfb4d14c3..8d6b5a241e 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -26,12 +26,18 @@ <doc>Qpid extension class to allow clustered brokers to communicate.</doc> <control name = "notify" code="0x1"> + <role name="server" implement="MUST" /> <field name="url" type="str16" /> </control> - <control name="connection-close" code="0x2"/> + <control name="connection-close" code="0x2"> + <role name="server" implement="MUST" /> + </control> - <control name="connection-do-output" code="0x3"/> + <control name="connection-do-output" code="0x3"> + <role name="server" implement="MUST" /> + <field name="bytes" type="uint32"/> + </control> </class> </amqp> |