summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-03-28 22:46:23 +0000
committerAlan Conway <aconway@apache.org>2011-03-28 22:46:23 +0000
commitea98033c3df7fbc1364df69c4edb5cf1e808e87e (patch)
tree32e57b36ffa24a907adc301c08f68d2b53ffda92
parentb51071c7876f42a6ddc44beddfb9593b915ae1b0 (diff)
downloadqpid-python-ea98033c3df7fbc1364df69c4edb5cf1e808e87e.tar.gz
QPID-2920: Cluster batch multicaster.
Send multiple multicast events in a single call to CPG using iovec. Encoding in the sending thread using reference counted buffers. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1086435 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk5
-rw-r--r--qpid/cpp/src/qpid/BufferRef.h13
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BufferFactory.h67
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp82
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Multicaster.h67
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/qpid-build-rinstall2
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark7
10 files changed, 244 insertions, 15 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 85887e767b..86448d991c 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -18,7 +18,7 @@
#
#
# Cluster library makefile fragment, to be included in Makefile.am
-#
+#
# Optional CMAN support
@@ -111,6 +111,7 @@ cluster2_la_SOURCES = \
qpid/cluster/PollerDispatch.h \
qpid/cluster/exp/BrokerHandler.cpp \
qpid/cluster/exp/BrokerHandler.h \
+ qpid/cluster/exp/BufferFactory.h \
qpid/cluster/exp/Cluster2Plugin.cpp \
qpid/cluster/exp/Core.cpp \
qpid/cluster/exp/Core.h \
@@ -120,6 +121,8 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/HandlerBase.h \
qpid/cluster/exp/MessageHandler.cpp \
qpid/cluster/exp/MessageHandler.h \
+ qpid/cluster/exp/Multicaster.cpp \
+ qpid/cluster/exp/Multicaster.h \
qpid/cluster/exp/WiringHandler.cpp \
qpid/cluster/exp/WiringHandler.h
diff --git a/qpid/cpp/src/qpid/BufferRef.h b/qpid/cpp/src/qpid/BufferRef.h
index bfe1f9ebaa..91525a3fc0 100644
--- a/qpid/cpp/src/qpid/BufferRef.h
+++ b/qpid/cpp/src/qpid/BufferRef.h
@@ -27,7 +27,11 @@
namespace qpid {
-/** Template for mutable or const buffer references */
+/** Reference to a ref-counted buffer of T.
+ * Template for mutable or const buffer references.
+ * Gathers together an intrusive_ptr for refcounting and begin and end pointers
+ * for the buffer space. For use with RefCountedBuffer and similar classes.
+ */
template <class T> class BufferRefT {
public:
BufferRefT() : begin_(0), end_(0) {}
@@ -38,8 +42,15 @@ template <class T> class BufferRefT {
template <class U> BufferRefT(const BufferRefT<U>& other) :
counter(other.counter), begin_(other.begin_), end_(other.end_) {}
+ template <class U> BufferRefT& operator=(const BufferRefT<U>& other) {
+ counter = other.counter; begin_ = other.begin_; end_ = other.end_;
+ }
+
T* begin() const { return begin_; }
T* end() const { return end_; }
+ size_t size() const { return end_ - begin_; }
+ operator bool() const { return begin_; }
+ bool operator!() const { return !begin_; }
/** Return a sub-buffer of the current buffer */
BufferRefT sub_buffer(T* begin, T* end) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h b/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h
new file mode 100644
index 0000000000..105b594d2d
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h
@@ -0,0 +1,67 @@
+#ifndef QPID_CLUSTER_EXP_BUFFERFACTORY_H
+#define QPID_CLUSTER_EXP_BUFFERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/BufferRef.h"
+#include "qpid/RefCountedBuffer.h"
+#include "qpid/sys/Mutex.h"
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Factory to allocate sub-buffers of RefCountedBuffers.
+ * Thread safe.
+ */
+class BufferFactory
+{
+ public:
+ /** @param min_size minimum size of underlying buffers. */
+ inline BufferFactory(size_t min_size);
+ inline BufferRef get(size_t size);
+ private:
+ sys::Mutex lock;
+ size_t min_size;
+ BufferRef buf;
+ char* pos;
+};
+
+BufferFactory::BufferFactory(size_t size) : min_size(size) {}
+
+BufferRef BufferFactory::get(size_t size) {
+ sys::Mutex::ScopedLock l(lock);
+ if (!buf || pos + size > buf.end()) {
+ buf = RefCountedBuffer::create(std::max(size, min_size));
+ pos = buf.begin();
+ }
+ assert(buf);
+ assert(buf.begin() <= pos);
+ assert(pos + size <= buf.end());
+ BufferRef ret(buf.sub_buffer(pos, pos+size));
+ pos += size;
+ return ret;
+}
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_BUFFERFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index 93ed96b9d8..e1dba349a1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -36,7 +36,8 @@ namespace cluster {
Core::Core(const Settings& s, broker::Broker& b) :
broker(b),
- eventHandler(new EventHandler(*this))
+ eventHandler(new EventHandler(*this)),
+ multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this))
{
eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler)));
eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler)));
@@ -61,15 +62,8 @@ void Core::fatal() {
void Core::mcast(const framing::AMQBody& body) {
QPID_LOG(trace, "cluster multicast: " << body);
- // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
- // here we multicast Frames rather than Events.
framing::AMQFrame f(body);
- std::string data(f.encodedSize(), char());
- framing::Buffer buf(&data[0], data.size());
- f.encode(buf);
- iovec iov = { buf.getPointer(), buf.getSize() };
- while (!eventHandler->getCpg().mcast(&iov, 1))
- ::usleep(1000); // FIXME aconway 2010-10-20: flow control
+ multicaster.mcast(f);
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h
index 3e53e0a65b..8b83a0004d 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.h
@@ -25,6 +25,7 @@
#include <string>
#include <memory>
#include "LockedMap.h"
+#include "Multicaster.h"
#include "qpid/cluster/types.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/broker/QueuedMessage.h"
@@ -87,6 +88,7 @@ class Core
std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
BrokerHandler* brokerHandler; // Handles broker events.
RoutingMap routingMap;
+ Multicaster multicaster;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
new file mode 100644
index 0000000000..bdf6c33387
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Multicaster.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+namespace {
+const size_t MAX_IOV = 60; // Limit imposed by CPG
+
+struct ::iovec bufToIov(const BufferRef& buf) {
+ ::iovec iov;
+ iov.iov_base = buf.begin();
+ iov.iov_len = buf.size();
+ return iov;
+}
+}
+
+Multicaster::Multicaster(Cpg& cpg_,
+ const boost::shared_ptr<sys::Poller>& poller,
+ boost::function<void()> onError_) :
+ onError(onError_), cpg(cpg_),
+ queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
+ ioVector(MAX_IOV),
+ buffers(64*1024) // TODO aconway 2011-03-15: optimum size?
+{
+ queue.start();
+}
+
+void Multicaster::mcast(const framing::AMQDataBlock& data) {
+ BufferRef bufRef = buffers.get(data.encodedSize());
+ framing::Buffer buf(bufRef.begin(), bufRef.size());
+ data.encode(buf);
+ queue.push(bufRef);
+}
+
+Multicaster::PollableEventQueue::Batch::const_iterator
+Multicaster::sendMcast(const PollableEventQueue::Batch& buffers) {
+ try {
+ PollableEventQueue::Batch::const_iterator i = buffers.begin();
+ while( i != buffers.end()) {
+ size_t len = std::min(size_t(buffers.end() - i), MAX_IOV);
+ PollableEventQueue::Batch::const_iterator j = i + len;
+ std::transform(i, j, ioVector.begin(), &bufToIov);
+ if (!cpg.mcast(&ioVector[0], len)) {
+ // CPG didn't send because of CPG flow control.
+ return i;
+ }
+ i = j;
+ }
+ return i;
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, "Multicast error: " << e.what());
+ queue.stop();
+ onError();
+ return buffers.end();
+ }
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.h b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
new file mode 100644
index 0000000000..a363eb5a0b
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
@@ -0,0 +1,67 @@
+#ifndef QPID_CLUSTER_MULTICASTER_H
+#define QPID_CLUSTER_MULTICASTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BufferFactory.h"
+#include "qpid/framing/AMQDataBlock.h"
+#include "qpid/sys/PollableQueue.h"
+#include <sys/uio.h> // For struct iovec
+
+namespace qpid {
+
+namespace sys {
+class Poller;
+}
+
+namespace cluster {
+
+class Cpg;
+
+/**
+ * Multicast to a CPG group in poller threads. Shared, thread safe object.
+ */
+class Multicaster
+{
+ public:
+ Multicaster(Cpg& cpg_,
+ const boost::shared_ptr<sys::Poller>&,
+ boost::function<void()> onError
+ );
+
+ /** Multicast an event */
+ void mcast(const framing::AMQDataBlock&);
+
+ private:
+ typedef sys::PollableQueue<BufferRef> PollableEventQueue;
+
+ PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& );
+
+ boost::function<void()> onError;
+ Cpg& cpg;
+ PollableEventQueue queue;
+ std::vector<struct ::iovec> ioVector;
+ BufferFactory buffers;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MULTICASTER_H*/
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 67765c0e46..e1ee6d4a23 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -410,6 +410,8 @@ class Cluster:
self.args += [ cluster_name,
"%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
+ self.args += [ "--log-enable=info+", "--log-enable=trace+:cluster"]
+
assert cluster_lib, "Cannot locate cluster plug-in"
self.args += [ "--load-module", cluster_lib ]
self.start_n(count, expect=expect, wait=wait)
diff --git a/qpid/cpp/src/tests/qpid-build-rinstall b/qpid/cpp/src/tests/qpid-build-rinstall
index 1a92f8750a..ebecee9960 100755
--- a/qpid/cpp/src/tests/qpid-build-rinstall
+++ b/qpid/cpp/src/tests/qpid-build-rinstall
@@ -21,7 +21,7 @@
# Run "make install"" locally then copy the install tree to each of $HOSTS
# Must be run in a configured qpid build directory.
#
-test -f config.status || { echo "Not in a configured build directory."; usage; }
+test -f config.status || { echo "Not in a configured build directory."; exit 1; }
. src/tests/install_env.sh
set -ex
make && make -j1 install
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 6138108558..98794082eb 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -118,7 +118,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
return clients.add(Popen(command, stdout=PIPE))
def start_send(queue, opts, broker, host):
- address="%s;{%s}"%(queue,",".join(opts.send_option))
+ address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:receiver"]))
command = ["qpid-send",
"-b", broker,
"-a", address,
@@ -243,7 +243,9 @@ def main():
for i in xrange(opts.repeat):
delete_queues(queues, opts.broker[0])
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ time.sleep(.1) # FIXME aconway 2011-03-16: new cluster async wiring
+ receivers = [start_receive(q, j, opts, ready_queue,
+ brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.receivers)]
ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts,brokers.next(), client_hosts.next())
@@ -253,7 +255,6 @@ def main():
recv_stats=parse_receivers(receivers)
if opts.summarize: print_summary(send_stats, recv_stats)
else: print_data(send_stats, recv_stats)
- delete_queues(queues, opts.broker[0])
finally: clients.kill() # No strays
if __name__ == "__main__": main()