diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/PollableQueue.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/types.h | 2 | ||||
-rwxr-xr-x | cpp/src/tests/benchmark | 5 |
7 files changed, 62 insertions, 36 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 4d54a837ca..2b12e4f54a 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -65,12 +65,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect - ) + ), + deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); cpg.join(name); - // Start dispatching from the poller. + + deliverQueue.start(poller); cpgDispatchHandle.startWatch(poller); } @@ -173,27 +175,7 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); - Buffer buf(static_cast<char*>(msg), msg_len); - Connection* connection; - uint8_t type = buf.getOctet(); - decodePtr(buf, connection); - if (connection == 0) { // Cluster controls - AMQFrame frame; - while (frame.decode(buf)) - if (!ClusterOperations(*this, from).invoke(frame)) - throw Exception("Invalid cluster control"); - } - else { // Connection data or control - boost::intrusive_ptr<Connection> c = - getConnection(ConnectionId(from, connection)); - if (type == DATA) - c->deliverBuffer(buf); - else { - AMQFrame frame; - while (frame.decode(buf)) - c->deliver(frame); - } - } + deliverQueue.push(Event::delivered(from, msg, msg_len)); } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -203,6 +185,26 @@ void Cluster::deliver( } } +void Cluster::deliverEvent(const Event& e) { + Buffer buf(e); + if (e.getConnection().getConnectionPtr() == 0) { // Cluster control + AMQFrame frame; + while (frame.decode(buf)) + if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame)) + throw Exception("Invalid cluster control"); + } + else { // Connection data or control + boost::intrusive_ptr<Connection> c = getConnection(e.getConnection()); + if (e.getType() == DATA) + c->deliverBuffer(buf); + else { // control + AMQFrame frame; + while (frame.decode(buf)) + c->deliver(frame); + } + } +} + struct AddrList { const cpg_address* addrs; int count; diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 630de97093..7d3ef13b14 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -20,6 +20,7 @@ */ #include "qpid/cluster/Cpg.h" +#include "qpid/cluster/Event.h" #include "qpid/cluster/PollableQueue.h" #include "qpid/cluster/NoOpConnectionOutputHandler.h" @@ -85,7 +86,7 @@ class Cluster : public RefCounted, private Cpg::Handler /** Message sent over the cluster. */ typedef std::pair<framing::AMQFrame, ConnectionId> Message; - typedef PollableQueue<Message> MessageQueue; + typedef PollableQueue<Event> EventQueue; boost::function<void()> shutdownNext; @@ -93,6 +94,8 @@ class Cluster : public RefCounted, private Cpg::Handler void deliverFrame(framing::AMQFrame&, const ConnectionId&); void deliverBuffer(const char*, size_t, const ConnectionId&); + + void deliverEvent(const Event&); /** CPG deliver callback. */ void deliver( @@ -132,7 +135,8 @@ class Cluster : public RefCounted, private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - + PollableQueue<Event> deliverQueue; + friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); friend std::ostream& operator <<(std::ostream&, const UrlMap&); diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index ff558842e4..66f3cf261b 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -25,6 +25,7 @@ namespace qpid { namespace cluster { + using framing::Buffer; const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/; @@ -32,10 +33,14 @@ const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/; Event::Event(EventType t, const ConnectionId c, const size_t s) : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {} -Event::Event(const MemberId& m, const char* d, size_t s) - : connection(m, 0), size(s-OVERHEAD), data(RefCountedBuffer::create(size)) -{ - memcpy(data->get(), d, s); +Event Event::delivered(const MemberId& m, void* d, size_t s) { + Buffer buf(static_cast<char*>(d), s); + EventType type((EventType)buf.getOctet()); + ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong())); + assert(buf.getPosition() == OVERHEAD); + Event e(type, connection, s-OVERHEAD); + memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD); + return e; } void Event::mcast(const Cpg::Name& name, Cpg& cpg) { diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 3e4a19f7f3..14ae253dc2 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -25,6 +25,7 @@ #include "types.h" #include "Cpg.h" #include "qpid/RefCountedBuffer.h" +#include "qpid/framing/Buffer.h" namespace qpid { namespace cluster { @@ -39,11 +40,11 @@ namespace cluster { */ struct Event { public: - /** Create an event with for mcasting, with size bytes of space. */ - Event(EventType t, const ConnectionId c, size_t size); + /** Create an event to mcast with a buffer of size bytes. */ + Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0); - /** Create an event from delivered data. */ - Event(const MemberId& m, const char* data, size_t size); + /** Create an event copied from delivered data. */ + static Event delivered(const MemberId& m, void* data, size_t size); void mcast(const Cpg::Name& name, Cpg& cpg); @@ -51,6 +52,9 @@ struct Event { ConnectionId getConnection() const { return connection; } size_t getSize() const { return size; } char* getData() { return data->get(); } + const char* getData() const { return data->get(); } + + operator framing::Buffer() const { return framing::Buffer(const_cast<char*>(getData()), getSize()); } private: static const size_t OVERHEAD; diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h index 74da2df750..1c7720f5c6 100644 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ b/cpp/src/qpid/cluster/PollableQueue.h @@ -27,6 +27,7 @@ #include "qpid/sys/Mutex.h" #include <boost/function.hpp> #include <boost/bind.hpp> +#include <algorithm> #include <deque> namespace qpid { @@ -53,6 +54,15 @@ class PollableQueue { /** Callback to process a range of items. */ typedef boost::function<void (const iterator&, const iterator&)> Callback; + /** Functor tempalate to create a Callback from a functor that handles a single item. */ + template <class F> struct ForEach { + F handleOne; + ForEach(const F& f) : handleOne(f) {} + void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } + }; + /** Function to create ForEach instances */ + template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } + /** When the queue is selected by the poller, values are passed to callback cb. */ explicit PollableQueue(const Callback& cb); diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index 8911896e1f..0cd6f1afbb 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -34,7 +34,7 @@ namespace cluster { class Connection; -/** Types of cluster messages. */ +/** Types of cluster event. */ enum EventType { DATA, CONTROL }; /** first=node-id, second=pid */ diff --git a/cpp/src/tests/benchmark b/cpp/src/tests/benchmark index d0ad698f9f..c075837847 100755 --- a/cpp/src/tests/benchmark +++ b/cpp/src/tests/benchmark @@ -78,7 +78,6 @@ dosamples() { } | tee $FILE } -echo "benchmark $*" | tee benchmark.tab HEADING="pub sub total Mb" dosamples $SCRIPTDIR/perfdist --size $SIZE --count $COUNT --nsubs $NSUBS --npubs $NPUBS -s -- ${CLIENTS[*]} --- ${BROKERS[*]} HEADING="pub" @@ -89,6 +88,8 @@ HEADING="min max avg" dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s -b ${BROKERS[0]} echo -echo "Tab separated spreadsheet (also stored in benchmark.tab):" +echo "Tab separated spreadsheet (also saved as benchmark.tab):" echo + +echo "benchmark -- ${CLIENTS[*]} --- ${BROKERS[*]} " | tee benchmark.tab paste $FILES | tee -a benchmark.tab |