summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
committerAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
commit299f256ef76ecbec842c98260bdc3c7671b0de91 (patch)
tree7068617975bea8b11eb76982f833bc20a2d76e4a
parent9802ade0355cf8d5f70bcaeea64aad86e6f8dc0a (diff)
downloadqpid-python-299f256ef76ecbec842c98260bdc3c7671b0de91.tar.gz
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@687813 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/cluster.mk6
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp18
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.h10
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h16
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp31
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp36
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h28
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp106
-rw-r--r--qpid/cpp/src/qpid/cluster/OutputInterceptor.h74
-rw-r--r--qpid/cpp/src/qpid/cluster/WriteEstimate.cpp63
-rw-r--r--qpid/cpp/src/qpid/cluster/WriteEstimate.h64
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h1
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h54
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp2
-rw-r--r--qpid/cpp/src/tests/consume.cpp11
-rw-r--r--qpid/cpp/src/tests/latencytest.cpp15
-rwxr-xr-xqpid/cpp/src/tests/start_cluster2
-rw-r--r--qpid/cpp/xml/cluster.xml10
20 files changed, 466 insertions, 87 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 172888cfb8..039c35ed4e 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -546,6 +546,7 @@ nobase_include_HEADERS = \
qpid/sys/ConnectionInputHandler.h \
qpid/sys/ConnectionInputHandlerFactory.h \
qpid/sys/ConnectionOutputHandler.h \
+ qpid/sys/ConnectionOutputHandlerPtr.h \
qpid/sys/DeletionManager.h \
qpid/sys/Dispatcher.h \
qpid/sys/IOHandle.h \
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index aa3644785b..934ec0174b 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -19,7 +19,11 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/ShadowConnectionOutputHandler.h \
qpid/cluster/PollableCondition.h \
qpid/cluster/PollableCondition.cpp \
- qpid/cluster/PollableQueue.h
+ qpid/cluster/PollableQueue.h \
+ qpid/cluster/WriteEstimate.h \
+ qpid/cluster/WriteEstimate.cpp \
+ qpid/cluster/OutputInterceptor.h \
+ qpid/cluster/OutputInterceptor.cpp
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index a3692911b2..0b996dedd2 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -30,7 +30,7 @@ using sys::Mutex;
Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
: frameQueueClosed(false), output(o),
connection(new broker::Connection(this, broker, id, _isClient)),
- identifier(id), initialized(false), isClient(_isClient) {}
+ identifier(id), initialized(false), isClient(_isClient), buffered(0) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
@@ -53,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
bool Connection::canEncode() {
if (!frameQueueClosed) connection->doOutput();
- Mutex::ScopedLock l(frameQueueLock);
+ Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -71,10 +71,12 @@ size_t Connection::encode(const char* buffer, size_t size) {
initialized = true;
QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
- while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
+ size_t frameSize=0;
+ while (!frameQueue.empty() && ((frameSize=frameQueue.front().size()) <= out.available())) {
frameQueue.front().encode(out);
QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
- frameQueue.pop();
+ frameQueue.pop_front();
+ buffered -= frameSize;
}
assert(frameQueue.empty() || frameQueue.front().size() <= size);
if (!frameQueue.empty() && frameQueue.front().size() > size)
@@ -98,7 +100,8 @@ void Connection::send(framing::AMQFrame& f) {
{
Mutex::ScopedLock l(frameQueueLock);
if (!frameQueueClosed)
- frameQueue.push(f);
+ frameQueue.push_back(f);
+ buffered += f.size();
}
activateOutput();
}
@@ -107,4 +110,9 @@ framing::ProtocolVersion Connection::getVersion() const {
return framing::ProtocolVersion(0,10);
}
+size_t Connection::getBuffered() const {
+ Mutex::ScopedLock l(frameQueueLock);
+ return buffered;
+}
+
}} // namespace qpid::amqp_0_10
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
index b707031789..f6fb87f928 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
@@ -26,7 +26,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/broker/Connection.h"
#include <boost/intrusive_ptr.hpp>
-#include <queue>
+#include <deque>
#include <memory>
namespace qpid {
@@ -36,7 +36,9 @@ namespace amqp_0_10 {
class Connection : public sys::ConnectionCodec,
public sys::ConnectionOutputHandler
{
- std::queue<framing::AMQFrame> frameQueue;
+ typedef std::deque<framing::AMQFrame> FrameQueue;
+
+ FrameQueue frameQueue;
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
@@ -44,7 +46,8 @@ class Connection : public sys::ConnectionCodec,
std::string identifier;
bool initialized;
bool isClient;
-
+ size_t buffered;
+
public:
Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
size_t decode(const char* buffer, size_t size);
@@ -56,6 +59,7 @@ class Connection : public sys::ConnectionCodec,
void close(); // closing from this end.
void send(framing::AMQFrame&);
framing::ProtocolVersion getVersion() const;
+ size_t getBuffered() const;
};
}} // namespace qpid::amqp_0_10
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index ab18d1f035..d65dbaeec7 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -79,7 +79,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
void Connection::requestIOProcessing(boost::function0<void> callback)
{
ioCallback = callback;
- out->activateOutput();
+ out.activateOutput();
}
Connection::~Connection()
@@ -178,7 +178,6 @@ void Connection::closedImpl(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
- // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -245,7 +244,7 @@ Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&)
case management::Connection::METHOD_CLOSE :
mgmtClosing = true;
if (mgmtObject != 0) mgmtObject->set_closing(1);
- out->activateOutput();
+ out.activateOutput();
status = Manageable::STATUS_OK;
break;
}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
index c9cf6ece8d..97055f8b2e 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionState.h
@@ -24,7 +24,7 @@
#include <vector>
#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionOutputHandlerPtr.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/management/Manageable.h"
#include "Broker.h"
@@ -34,11 +34,14 @@ namespace broker {
class ConnectionState : public ConnectionToken, public management::Manageable
{
+ protected:
+ sys::ConnectionOutputHandlerPtr out;
+
public:
- ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
+ ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
+ out(o),
broker(b),
- outputTasks(*o),
- out(o),
+ outputTasks(out),
framemax(65535),
heartbeat(0),
stagingThreshold(broker.getStagingThreshold())
@@ -67,14 +70,13 @@ class ConnectionState : public ConnectionToken, public management::Manageable
//contained output tasks
sys::AggregateOutput outputTasks;
- sys::ConnectionOutputHandler& getOutput() const { return *out; }
+ sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
framing::ProtocolVersion getVersion() const { return version; }
- void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; }
+ void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
protected:
framing::ProtocolVersion version;
- sys::ConnectionOutputHandler* out;
uint32_t framemax;
uint16_t heartbeat;
uint64_t stagingThreshold;
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 05ab9148b5..75ce19e835 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,12 +25,14 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
+#include <boost/current_function.hpp>
#include <algorithm>
#include <iterator>
#include <map>
@@ -76,11 +78,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
cpg.join(name);
notify();
- // FIXME aconway 2008-08-11: can we remove this loop?
- // Dispatch till we show up in the cluster map.
- while (empty())
- cpg.dispatchOne();
-
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
deliverQueue.start(poller);
@@ -97,9 +94,8 @@ Cluster::~Cluster() {
std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
}
-// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
- bool isLocal = &c.getOutput() != &shadowOut;
+ bool isLocal = c.getOutput().get() != &shadowOut;
if (isLocal)
localConnectionSet.insert(new ConnectionInterceptor(c, *this));
}
@@ -107,10 +103,8 @@ void Cluster::initialize(broker::Connection& c) {
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
- // At this point the poller has already been shut down so
- // no dispatches can occur thru the cpgDispatchHandle.
- //
- // FIXME aconway 2008-08-11: assert this is the cae.
+ // Leave is called by from Broker destructor after the poller has
+ // been shut down. No dispatches can occur.
QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
@@ -173,13 +167,6 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-// ################ HERE - leaking shadow connections.
-// FIXME aconway 2008-08-11: revisit memory management for shadow
-// connections, what if the Connection is closed other than via
-// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
-// map, add deleted state to ConnectionInterceptor? Interceptors need
-// to know about map? Check how Connections can be deleted.
-
ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
ShadowConnectionId id(member, remotePtr);
ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -274,7 +261,8 @@ void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethod
break;
}
case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
- connection->deliverDoOutput();
+ ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
+ connection->deliverDoOutput(doOutput.getBytes());
break;
}
default:
@@ -309,9 +297,8 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
void Cluster::disconnect(sys::DispatchHandle& h) {
h.stopWatch();
- // FIXME aconway 2008-08-11: error handling if we are disconnected.
- // Kill the broker?
- assert(0);
+ QPID_LOG(critical, "Disconnected from cluster, shutting down");
+ broker.shutdown();
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
index c13651eccb..efcab1b731 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
@@ -22,6 +22,8 @@
#include "qpid/framing/ClusterConnectionCloseBody.h"
#include "qpid/framing/ClusterConnectionDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
namespace qpid {
namespace cluster {
@@ -32,24 +34,27 @@ template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b;
ConnectionInterceptor::ConnectionInterceptor(
broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
- : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_)
+ : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get())
{
connection->addFinalizer(boost::bind(operator delete, this));
+ connection->setOutputHandler(&output),
// Attach my functions to Connection extension points.
shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
- shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this));
+ shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output));
}
ConnectionInterceptor::~ConnectionInterceptor() {
assert(connection == 0);
}
+// Forward all received frames to the cluster, continue handling on delivery.
void ConnectionInterceptor::received(framing::AMQFrame& f) {
if (isClosed) return;
cluster.send(f, this);
}
+// Continue normal handling of delivered frames.
void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
receivedNext(f);
}
@@ -81,28 +86,17 @@ void ConnectionInterceptor::deliverClosed() {
}
void ConnectionInterceptor::dirtyClose() {
- // Not closed via cluster self-delivery but closed locally.
- // Used for dirty cluster shutdown where active connections
- // must be cleaned up.
+ // Not closed via cluster self-delivery but closed locally. Used
+ // when local broker is shut down without a clean cluster shutdown.
+ // Release the connection, it will delete this.
connection = 0;
}
-bool ConnectionInterceptor::doOutput() {
- // FIXME aconway 2008-08-15: this is not correct.
- // Run in write threads so order of execution of doOutput is not determinate.
- // Will only work reliably for in single-consumer tests.
-
- if (connection->hasOutput()) {
- cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
- return doOutputNext();
- }
- return false;
-}
-
-void ConnectionInterceptor::deliverDoOutput() {
- // FIXME aconway 2008-08-15: see comment in doOutput.
- if (isShadow())
- doOutputNext();
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void ConnectionInterceptor::deliverDoOutput(size_t requested) {
+ output.deliverDoOutput(requested);
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
index 370572bd9d..9216921067 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
+++ b/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H
-#define QPID_CLUSTER_CONNECTIONPLUGIN_H
+#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
+#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
/*
*
@@ -23,6 +23,8 @@
*/
#include "Cluster.h"
+#include "WriteEstimate.h"
+#include "OutputInterceptor.h"
#include "qpid/broker/Connection.h"
#include "qpid/sys/ConnectionOutputHandler.h"
@@ -41,42 +43,46 @@ class ConnectionInterceptor {
Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
- bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
-
+ bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); }
+ bool isLocal() const { return !isShadow(); }
+ bool getClosed() const { return isClosed; }
+
// self-delivery of intercepted extension points.
void deliver(framing::AMQFrame& f);
void deliverClosed();
- void deliverDoOutput();
+ void deliverDoOutput(size_t requested);
void dirtyClose();
+ Cluster& getCluster() { return cluster; }
+
private:
struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
void close() {}
void send(framing::AMQFrame&) {}
- void doOutput() {}
void activateOutput() {}
};
- bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); }
-
// Functions to intercept to Connection extension points.
void received(framing::AMQFrame&);
void closed();
bool doOutput();
+ void activateOutput();
+
+ void sendDoOutput();
boost::function<void (framing::AMQFrame&)> receivedNext;
boost::function<void ()> closedNext;
- boost::function<bool ()> doOutputNext;
boost::intrusive_ptr<broker::Connection> connection;
Cluster& cluster;
NullConnectionHandler discardHandler;
bool isClosed;
Cluster::ShadowConnectionId shadowId;
+ WriteEstimate writeEstimate;
+ OutputInterceptor output;
};
}} // namespace qpid::cluster
-#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/
-
+#endif /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
new file mode 100644
index 0000000000..84d3a6ad69
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "OutputInterceptor.h"
+#include "ConnectionInterceptor.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h)
+ : parent(p), next(h), sent(), moreOutput(), doingOutput()
+{}
+
+void OutputInterceptor::send(framing::AMQFrame& f) {
+ Locker l(lock);
+ next.send(f);
+ sent += f.size();
+}
+
+void OutputInterceptor::activateOutput() {
+ Locker l(lock);
+ moreOutput = true;
+ sendDoOutput();
+}
+
+// Called in write thread when the IO layer has no more data to write.
+// We do nothing in the write thread, we run doOutput only on delivery
+// of doOutput requests.
+bool OutputInterceptor::doOutput() {
+ return false;
+}
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void OutputInterceptor::deliverDoOutput(size_t requested) {
+ if (parent.getClosed()) return;
+
+ Locker l(lock);
+ size_t buf = next.getBuffered();
+ if (parent.isLocal())
+ writeEstimate.delivered(sent, buf); // Update the estimate.
+
+ // Run the real doOutput() till we have added the requested data or there's nothing to output.
+ sent = 0;
+ do {
+ sys::Mutex::ScopedUnlock u(lock);
+ moreOutput = doOutputNext(); // Calls send()
+ } while (sent < requested && moreOutput);
+ sent += buf; // Include buffered data in the sent total.
+
+ QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
+
+ if (parent.isLocal() && moreOutput)
+ sendDoOutput();
+ else
+ doingOutput = false;
+}
+
+void OutputInterceptor::startDoOutput() {
+ if (!doingOutput)
+ sendDoOutput();
+}
+
+// Send a doOutput request if one is not already in flight.
+void OutputInterceptor::sendDoOutput() {
+ // Call with lock held.
+ if (parent.isShadow() || parent.getClosed())
+ return;
+
+ doingOutput = true;
+ size_t request = writeEstimate.sending(getBuffered());
+
+ // Note we may send 0 size request if there's more than 2*estimate in the buffer.
+ // Send it anyway to keep the doOutput chain going until we are sure there's no more output
+ // (in deliverDoOutput)
+ //
+ parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>(
+ framing::ProtocolVersion(), request)), &parent);
+ QPID_LOG(trace, &parent << "Send doOutput request for " << request);
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.h b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
new file mode 100644
index 0000000000..b39f2a2be9
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -0,0 +1,74 @@
+#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/broker/ConnectionFactory.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace framing { class AMQFrame; }
+namespace cluster {
+
+class ConnectionInterceptor;
+
+/**
+ * Interceptor for connection OutputHandler, manages outgoing message replication.
+ */
+class OutputInterceptor : public sys::ConnectionOutputHandler {
+ public:
+ OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h);
+
+ // sys::ConnectionOutputHandler functions
+ void send(framing::AMQFrame& f);
+ void activateOutput();
+ void close() { Locker l(lock); next.close(); }
+ size_t getBuffered() const { Locker l(lock); return next.getBuffered(); }
+
+ // Delivery point for doOutput requests.
+ void deliverDoOutput(size_t requested);
+ // Intercept doOutput requests on Connection.
+ bool doOutput();
+
+ boost::function<bool ()> doOutputNext;
+
+ ConnectionInterceptor& parent;
+
+ private:
+ typedef sys::Mutex::ScopedLock Locker;
+
+ void startDoOutput();
+ void sendDoOutput();
+
+ mutable sys::Mutex lock;
+ sys::ConnectionOutputHandler& next;
+ size_t sent;
+ WriteEstimate writeEstimate;
+ bool moreOutput;
+ bool doingOutput;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/
diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
new file mode 100644
index 0000000000..81131be437
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/log/Statement.h"
+#include <boost/current_function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+WriteEstimate::WriteEstimate(size_t initial)
+ : growing(true), estimate(initial) {}
+
+size_t WriteEstimate::sending(size_t buffered) {
+ // We want to send a doOutput request for enough data such
+ // that if estimate bytes are written before it is self
+ // delivered then what is left in the buffer plus the doOutput
+ // request will be estimate bytes.
+
+ size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0;
+ size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0;
+ return request;
+}
+
+size_t pad(size_t value) { return value + value/2; }
+
+void WriteEstimate::delivered(size_t sent, size_t buffered) {
+ size_t wrote = sent > buffered ? sent - buffered : 0;
+ if (wrote == 0) // No change
+ return;
+ if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity.
+ growing = false;
+ estimate = pad(wrote); // Estimate at 1.5 write for padding.
+ }
+ else if (wrote > estimate) { // Wrote everything, buffer was under-stocked
+ if (growing)
+ estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock.
+ else
+ estimate = pad(wrote);
+ }
+}
+
+}} // namespace qpid::cluster
+
+
diff --git a/qpid/cpp/src/qpid/cluster/WriteEstimate.h b/qpid/cpp/src/qpid/cluster/WriteEstimate.h
new file mode 100644
index 0000000000..01ab2a3e34
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/WriteEstimate.h
@@ -0,0 +1,64 @@
+#ifndef QPID_CLUSTER_WRITEESTIMATE_H
+#define QPID_CLUSTER_WRITEESTIMATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Estimate the amount of data that a connection can write between sending
+ * a doOutput notice and re-receiving it.
+ *
+ * The goal is to avoid ever write-idling the connection by sending
+ * the next doOutput request as soon as we've processed the previous
+ * one, such that data generated by the previous request will keep the
+ * writer busy till the next one is delivered.
+ *
+ */
+class WriteEstimate
+{
+ public:
+ WriteEstimate(size_t initial=4096);
+
+ /** About to send a doOutput request.
+ * Update estimation state and return size for next request.
+ */
+ size_t sending(size_t buffered);
+
+ /**
+ * doOutput request just delivered, not yet executed. Update the estimate.
+ * and estimate how much data to request in the next onOutput
+ * request. 0 means don't send an onOutput request.
+ */
+ void delivered(size_t sent, size_t buffered);
+
+ private:
+ bool growing;
+ size_t estimate;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_WRITEESTIMATE_H*/
diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
index 5a60ae4998..de0bef3630 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
@@ -34,6 +34,7 @@ class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, pub
{
public:
virtual void close() = 0;
+ virtual size_t getBuffered() const { return 0; }
};
}}
diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
new file mode 100644
index 0000000000..c2da8edc2c
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -0,0 +1,54 @@
+#ifndef QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
+#define QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionOutputHandler.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A ConnectionOutputHandler that delegates to another
+ * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler
+ * to be changed modified without updating all the pointers/references
+ * using the ConnectionOutputHandlerPtr
+ */
+class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
+{
+ public:
+ ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {}
+ void set(ConnectionOutputHandler* p) { next = p; }
+ ConnectionOutputHandler* get() { return next; }
+ const ConnectionOutputHandler* get() const { return next; }
+
+ void close() { next->close(); }
+ size_t getBuffered() const { return next->getBuffered(); }
+ void activateOutput() { next->activateOutput(); }
+ void send(framing::AMQFrame& f) { next->send(f); }
+
+ private:
+ ConnectionOutputHandler* next;
+};
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H*/
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 49c5264990..8f3927732d 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -78,7 +78,7 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
::sleep(1);
--retry;
}
- BOOST_CHECK_EQUAL(n, getGlobalCluster()->size());
+ BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
}
void ClusterFixture::add() {
diff --git a/qpid/cpp/src/tests/consume.cpp b/qpid/cpp/src/tests/consume.cpp
index fecf6bb315..c20a738755 100644
--- a/qpid/cpp/src/tests/consume.cpp
+++ b/qpid/cpp/src/tests/consume.cpp
@@ -42,13 +42,15 @@ struct Args : public qpid::TestOptions {
uint count;
uint ack;
string queue;
-
+ bool declare;
+
Args() : count(0), ack(1)
{
addOptions()
("count", optValue(count, "N"), "number of messages to publish")
("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)")
- ("queue", optValue(queue, "<queue name>"), "queue to consume from");
+ ("queue", optValue(queue, "<queue name>"), "queue to consume from")
+ ("declare", optValue(declare), "declare the queue");
}
};
@@ -67,7 +69,8 @@ struct Client
void consume()
{
-
+ if (opts.declare)
+ session.queueDeclare(opts.queue);
SubscriptionManager subs(session);
LocalQueue lq(AckPolicy(opts.ack));
subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
@@ -77,7 +80,7 @@ struct Client
Message msg;
for (size_t i = 0; i < opts.count; ++i) {
msg=lq.pop();
- std::cout << "Received: " << msg.getMessageProperties().getCorrelationId() << std::endl;
+ QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
}
if (opts.ack != 0)
subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp
index e1a6f156a5..6c3fdd23bd 100644
--- a/qpid/cpp/src/tests/latencytest.cpp
+++ b/qpid/cpp/src/tests/latencytest.cpp
@@ -44,6 +44,7 @@ struct Args : public qpid::TestOptions {
uint size;
uint count;
uint rate;
+ bool sync;
uint reportFrequency;
uint timeLimit;
uint queues;
@@ -65,6 +66,7 @@ struct Args : public qpid::TestOptions {
("queues", optValue(queues, "N"), "number of queues")
("count", optValue(count, "N"), "number of messages to send")
("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
+ ("sync", optValue(sync), "send messages synchronously")
("report-frequency", optValue(reportFrequency, "N"),
"number of milliseconds to wait between reports (ignored unless rate specified)")
("time-limit", optValue(timeLimit, "N"),
@@ -143,6 +145,7 @@ class Sender : public Client
void sendByRate();
void sendByCount();
Receiver& receiver;
+ const string data;
public:
Sender(const string& queue, Receiver& receiver);
void test();
@@ -285,7 +288,7 @@ void Stats::reset()
totalLatency = maxLatency = minLatency = 0;
}
-Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {}
+Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {}
void Sender::test()
{
@@ -295,7 +298,7 @@ void Sender::test()
void Sender::sendByCount()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
@@ -303,15 +306,15 @@ void Sender::sendByCount()
for (uint i = 0; i < opts.count; i++) {
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
+ if (opts.sync) session.sync();
}
session.sync();
}
void Sender::sendByRate()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
@@ -324,9 +327,9 @@ void Sender::sendByRate()
while (true) {
uint64_t start_msg(current_time());
msg.getDeliveryProperties().setTimestamp(start_msg);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
-
+ if (opts.sync) session.sync();
+
uint64_t now = current_time();
if (timeLimit != 0 && (now - start) > timeLimit) {
diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster
index 55f989a3e9..6d254190df 100755
--- a/qpid/cpp/src/tests/start_cluster
+++ b/qpid/cpp/src/tests/start_cluster
@@ -16,7 +16,7 @@ OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no-
if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port.
../qpidd -q
- with_ais_group ../qpidd $OPTS || exit 1
+ with_ais_group ../qpidd $OPTS --log-output=cluster.log || exit 1
else
for (( i=0; i<SIZE; ++i )); do
PORT=`with_ais_group ../qpidd -p0 --log-output=cluster$i.log $OPTS` || exit 1
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 6dfb4d14c3..8d6b5a241e 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -26,12 +26,18 @@
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
<control name = "notify" code="0x1">
+ <role name="server" implement="MUST" />
<field name="url" type="str16" />
</control>
- <control name="connection-close" code="0x2"/>
+ <control name="connection-close" code="0x2">
+ <role name="server" implement="MUST" />
+ </control>
- <control name="connection-do-output" code="0x3"/>
+ <control name="connection-do-output" code="0x3">
+ <role name="server" implement="MUST" />
+ <field name="bytes" type="uint32"/>
+ </control>
</class>
</amqp>