From ea98033c3df7fbc1364df69c4edb5cf1e808e87e Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 28 Mar 2011 22:46:23 +0000 Subject: QPID-2920: Cluster batch multicaster. Send multiple multicast events in a single call to CPG using iovec. Encoding in the sending thread using reference counted buffers. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1086435 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/cluster.mk | 5 +- qpid/cpp/src/qpid/BufferRef.h | 13 ++++- qpid/cpp/src/qpid/cluster/exp/BufferFactory.h | 67 ++++++++++++++++++++++ qpid/cpp/src/qpid/cluster/exp/Core.cpp | 12 +--- qpid/cpp/src/qpid/cluster/exp/Core.h | 2 + qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp | 82 +++++++++++++++++++++++++++ qpid/cpp/src/qpid/cluster/exp/Multicaster.h | 67 ++++++++++++++++++++++ qpid/cpp/src/tests/brokertest.py | 2 + qpid/cpp/src/tests/qpid-build-rinstall | 2 +- qpid/cpp/src/tests/qpid-cpp-benchmark | 7 ++- 10 files changed, 244 insertions(+), 15 deletions(-) create mode 100644 qpid/cpp/src/qpid/cluster/exp/BufferFactory.h create mode 100644 qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp create mode 100644 qpid/cpp/src/qpid/cluster/exp/Multicaster.h diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 85887e767b..86448d991c 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -18,7 +18,7 @@ # # # Cluster library makefile fragment, to be included in Makefile.am -# +# # Optional CMAN support @@ -111,6 +111,7 @@ cluster2_la_SOURCES = \ qpid/cluster/PollerDispatch.h \ qpid/cluster/exp/BrokerHandler.cpp \ qpid/cluster/exp/BrokerHandler.h \ + qpid/cluster/exp/BufferFactory.h \ qpid/cluster/exp/Cluster2Plugin.cpp \ qpid/cluster/exp/Core.cpp \ qpid/cluster/exp/Core.h \ @@ -120,6 +121,8 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/HandlerBase.h \ qpid/cluster/exp/MessageHandler.cpp \ qpid/cluster/exp/MessageHandler.h \ + qpid/cluster/exp/Multicaster.cpp \ + qpid/cluster/exp/Multicaster.h \ qpid/cluster/exp/WiringHandler.cpp \ qpid/cluster/exp/WiringHandler.h diff --git a/qpid/cpp/src/qpid/BufferRef.h b/qpid/cpp/src/qpid/BufferRef.h index bfe1f9ebaa..91525a3fc0 100644 --- a/qpid/cpp/src/qpid/BufferRef.h +++ b/qpid/cpp/src/qpid/BufferRef.h @@ -27,7 +27,11 @@ namespace qpid { -/** Template for mutable or const buffer references */ +/** Reference to a ref-counted buffer of T. + * Template for mutable or const buffer references. + * Gathers together an intrusive_ptr for refcounting and begin and end pointers + * for the buffer space. For use with RefCountedBuffer and similar classes. + */ template class BufferRefT { public: BufferRefT() : begin_(0), end_(0) {} @@ -38,8 +42,15 @@ template class BufferRefT { template BufferRefT(const BufferRefT& other) : counter(other.counter), begin_(other.begin_), end_(other.end_) {} + template BufferRefT& operator=(const BufferRefT& other) { + counter = other.counter; begin_ = other.begin_; end_ = other.end_; + } + T* begin() const { return begin_; } T* end() const { return end_; } + size_t size() const { return end_ - begin_; } + operator bool() const { return begin_; } + bool operator!() const { return !begin_; } /** Return a sub-buffer of the current buffer */ BufferRefT sub_buffer(T* begin, T* end) { diff --git a/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h b/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h new file mode 100644 index 0000000000..105b594d2d --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h @@ -0,0 +1,67 @@ +#ifndef QPID_CLUSTER_EXP_BUFFERFACTORY_H +#define QPID_CLUSTER_EXP_BUFFERFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/BufferRef.h" +#include "qpid/RefCountedBuffer.h" +#include "qpid/sys/Mutex.h" +#include + +namespace qpid { +namespace cluster { + +/** + * Factory to allocate sub-buffers of RefCountedBuffers. + * Thread safe. + */ +class BufferFactory +{ + public: + /** @param min_size minimum size of underlying buffers. */ + inline BufferFactory(size_t min_size); + inline BufferRef get(size_t size); + private: + sys::Mutex lock; + size_t min_size; + BufferRef buf; + char* pos; +}; + +BufferFactory::BufferFactory(size_t size) : min_size(size) {} + +BufferRef BufferFactory::get(size_t size) { + sys::Mutex::ScopedLock l(lock); + if (!buf || pos + size > buf.end()) { + buf = RefCountedBuffer::create(std::max(size, min_size)); + pos = buf.begin(); + } + assert(buf); + assert(buf.begin() <= pos); + assert(pos + size <= buf.end()); + BufferRef ret(buf.sub_buffer(pos, pos+size)); + pos += size; + return ret; +} + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_BUFFERFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index 93ed96b9d8..e1dba349a1 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -36,7 +36,8 @@ namespace cluster { Core::Core(const Settings& s, broker::Broker& b) : broker(b), - eventHandler(new EventHandler(*this)) + eventHandler(new EventHandler(*this)), + multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)) { eventHandler->add(boost::shared_ptr(new WiringHandler(*eventHandler))); eventHandler->add(boost::shared_ptr(new MessageHandler(*eventHandler))); @@ -61,15 +62,8 @@ void Core::fatal() { void Core::mcast(const framing::AMQBody& body) { QPID_LOG(trace, "cluster multicast: " << body); - // FIXME aconway 2010-10-20: use Multicaster, or bring in its features. - // here we multicast Frames rather than Events. framing::AMQFrame f(body); - std::string data(f.encodedSize(), char()); - framing::Buffer buf(&data[0], data.size()); - f.encode(buf); - iovec iov = { buf.getPointer(), buf.getSize() }; - while (!eventHandler->getCpg().mcast(&iov, 1)) - ::usleep(1000); // FIXME aconway 2010-10-20: flow control + multicaster.mcast(f); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h index 3e53e0a65b..8b83a0004d 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.h +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -25,6 +25,7 @@ #include #include #include "LockedMap.h" +#include "Multicaster.h" #include "qpid/cluster/types.h" #include "qpid/cluster/Cpg.h" #include "qpid/broker/QueuedMessage.h" @@ -87,6 +88,7 @@ class Core std::auto_ptr eventHandler; // Handles CPG events. BrokerHandler* brokerHandler; // Handles broker events. RoutingMap routingMap; + Multicaster multicaster; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp new file mode 100644 index 0000000000..bdf6c33387 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Multicaster.h" +#include "qpid/cluster/Cpg.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +namespace { +const size_t MAX_IOV = 60; // Limit imposed by CPG + +struct ::iovec bufToIov(const BufferRef& buf) { + ::iovec iov; + iov.iov_base = buf.begin(); + iov.iov_len = buf.size(); + return iov; +} +} + +Multicaster::Multicaster(Cpg& cpg_, + const boost::shared_ptr& poller, + boost::function onError_) : + onError(onError_), cpg(cpg_), + queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), + ioVector(MAX_IOV), + buffers(64*1024) // TODO aconway 2011-03-15: optimum size? +{ + queue.start(); +} + +void Multicaster::mcast(const framing::AMQDataBlock& data) { + BufferRef bufRef = buffers.get(data.encodedSize()); + framing::Buffer buf(bufRef.begin(), bufRef.size()); + data.encode(buf); + queue.push(bufRef); +} + +Multicaster::PollableEventQueue::Batch::const_iterator +Multicaster::sendMcast(const PollableEventQueue::Batch& buffers) { + try { + PollableEventQueue::Batch::const_iterator i = buffers.begin(); + while( i != buffers.end()) { + size_t len = std::min(size_t(buffers.end() - i), MAX_IOV); + PollableEventQueue::Batch::const_iterator j = i + len; + std::transform(i, j, ioVector.begin(), &bufToIov); + if (!cpg.mcast(&ioVector[0], len)) { + // CPG didn't send because of CPG flow control. + return i; + } + i = j; + } + return i; + } + catch (const std::exception& e) { + QPID_LOG(critical, "Multicast error: " << e.what()); + queue.stop(); + onError(); + return buffers.end(); + } +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.h b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h new file mode 100644 index 0000000000..a363eb5a0b --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.h @@ -0,0 +1,67 @@ +#ifndef QPID_CLUSTER_MULTICASTER_H +#define QPID_CLUSTER_MULTICASTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BufferFactory.h" +#include "qpid/framing/AMQDataBlock.h" +#include "qpid/sys/PollableQueue.h" +#include // For struct iovec + +namespace qpid { + +namespace sys { +class Poller; +} + +namespace cluster { + +class Cpg; + +/** + * Multicast to a CPG group in poller threads. Shared, thread safe object. + */ +class Multicaster +{ + public: + Multicaster(Cpg& cpg_, + const boost::shared_ptr&, + boost::function onError + ); + + /** Multicast an event */ + void mcast(const framing::AMQDataBlock&); + + private: + typedef sys::PollableQueue PollableEventQueue; + + PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& ); + + boost::function onError; + Cpg& cpg; + PollableEventQueue queue; + std::vector ioVector; + BufferFactory buffers; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MULTICASTER_H*/ diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 67765c0e46..e1ee6d4a23 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -410,6 +410,8 @@ class Cluster: self.args += [ cluster_name, "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] + self.args += [ "--log-enable=info+", "--log-enable=trace+:cluster"] + assert cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", cluster_lib ] self.start_n(count, expect=expect, wait=wait) diff --git a/qpid/cpp/src/tests/qpid-build-rinstall b/qpid/cpp/src/tests/qpid-build-rinstall index 1a92f8750a..ebecee9960 100755 --- a/qpid/cpp/src/tests/qpid-build-rinstall +++ b/qpid/cpp/src/tests/qpid-build-rinstall @@ -21,7 +21,7 @@ # Run "make install"" locally then copy the install tree to each of $HOSTS # Must be run in a configured qpid build directory. # -test -f config.status || { echo "Not in a configured build directory."; usage; } +test -f config.status || { echo "Not in a configured build directory."; exit 1; } . src/tests/install_env.sh set -ex make && make -j1 install diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 6138108558..98794082eb 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -118,7 +118,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host): return clients.add(Popen(command, stdout=PIPE)) def start_send(queue, opts, broker, host): - address="%s;{%s}"%(queue,",".join(opts.send_option)) + address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:receiver"])) command = ["qpid-send", "-b", broker, "-a", address, @@ -243,7 +243,9 @@ def main(): for i in xrange(opts.repeat): delete_queues(queues, opts.broker[0]) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + time.sleep(.1) # FIXME aconway 2011-03-16: new cluster async wiring + receivers = [start_receive(q, j, opts, ready_queue, + brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. senders = [start_send(q, opts,brokers.next(), client_hosts.next()) @@ -253,7 +255,6 @@ def main(): recv_stats=parse_receivers(receivers) if opts.summarize: print_summary(send_stats, recv_stats) else: print_data(send_stats, recv_stats) - delete_queues(queues, opts.broker[0]) finally: clients.kill() # No strays if __name__ == "__main__": main() -- cgit v1.2.1