summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp48
-rw-r--r--cpp/src/qpid/cluster/Cluster.h8
-rw-r--r--cpp/src/qpid/cluster/Event.cpp13
-rw-r--r--cpp/src/qpid/cluster/Event.h12
-rw-r--r--cpp/src/qpid/cluster/PollableQueue.h10
-rw-r--r--cpp/src/qpid/cluster/types.h2
-rwxr-xr-xcpp/src/tests/benchmark5
7 files changed, 62 insertions, 36 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 4d54a837ca..2b12e4f54a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -65,12 +65,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
- )
+ ),
+ deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
cpg.join(name);
- // Start dispatching from the poller.
+
+ deliverQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
@@ -173,27 +175,7 @@ void Cluster::deliver(
{
try {
MemberId from(nodeid, pid);
- Buffer buf(static_cast<char*>(msg), msg_len);
- Connection* connection;
- uint8_t type = buf.getOctet();
- decodePtr(buf, connection);
- if (connection == 0) { // Cluster controls
- AMQFrame frame;
- while (frame.decode(buf))
- if (!ClusterOperations(*this, from).invoke(frame))
- throw Exception("Invalid cluster control");
- }
- else { // Connection data or control
- boost::intrusive_ptr<Connection> c =
- getConnection(ConnectionId(from, connection));
- if (type == DATA)
- c->deliverBuffer(buf);
- else {
- AMQFrame frame;
- while (frame.decode(buf))
- c->deliver(frame);
- }
- }
+ deliverQueue.push(Event::delivered(from, msg, msg_len));
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -203,6 +185,26 @@ void Cluster::deliver(
}
}
+void Cluster::deliverEvent(const Event& e) {
+ Buffer buf(e);
+ if (e.getConnection().getConnectionPtr() == 0) { // Cluster control
+ AMQFrame frame;
+ while (frame.decode(buf))
+ if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame))
+ throw Exception("Invalid cluster control");
+ }
+ else { // Connection data or control
+ boost::intrusive_ptr<Connection> c = getConnection(e.getConnection());
+ if (e.getType() == DATA)
+ c->deliverBuffer(buf);
+ else { // control
+ AMQFrame frame;
+ while (frame.decode(buf))
+ c->deliver(frame);
+ }
+ }
+}
+
struct AddrList {
const cpg_address* addrs;
int count;
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 630de97093..7d3ef13b14 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -20,6 +20,7 @@
*/
#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/Event.h"
#include "qpid/cluster/PollableQueue.h"
#include "qpid/cluster/NoOpConnectionOutputHandler.h"
@@ -85,7 +86,7 @@ class Cluster : public RefCounted, private Cpg::Handler
/** Message sent over the cluster. */
typedef std::pair<framing::AMQFrame, ConnectionId> Message;
- typedef PollableQueue<Message> MessageQueue;
+ typedef PollableQueue<Event> EventQueue;
boost::function<void()> shutdownNext;
@@ -93,6 +94,8 @@ class Cluster : public RefCounted, private Cpg::Handler
void deliverFrame(framing::AMQFrame&, const ConnectionId&);
void deliverBuffer(const char*, size_t, const ConnectionId&);
+
+ void deliverEvent(const Event&);
/** CPG deliver callback. */
void deliver(
@@ -132,7 +135,8 @@ class Cluster : public RefCounted, private Cpg::Handler
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
-
+ PollableQueue<Event> deliverQueue;
+
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
friend std::ostream& operator <<(std::ostream&, const UrlMap&);
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index ff558842e4..66f3cf261b 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -25,6 +25,7 @@
namespace qpid {
namespace cluster {
+
using framing::Buffer;
const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
@@ -32,10 +33,14 @@ const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
Event::Event(EventType t, const ConnectionId c, const size_t s)
: type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
-Event::Event(const MemberId& m, const char* d, size_t s)
- : connection(m, 0), size(s-OVERHEAD), data(RefCountedBuffer::create(size))
-{
- memcpy(data->get(), d, s);
+Event Event::delivered(const MemberId& m, void* d, size_t s) {
+ Buffer buf(static_cast<char*>(d), s);
+ EventType type((EventType)buf.getOctet());
+ ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+ assert(buf.getPosition() == OVERHEAD);
+ Event e(type, connection, s-OVERHEAD);
+ memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD);
+ return e;
}
void Event::mcast(const Cpg::Name& name, Cpg& cpg) {
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index 3e4a19f7f3..14ae253dc2 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -25,6 +25,7 @@
#include "types.h"
#include "Cpg.h"
#include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/Buffer.h"
namespace qpid {
namespace cluster {
@@ -39,11 +40,11 @@ namespace cluster {
*/
struct Event {
public:
- /** Create an event with for mcasting, with size bytes of space. */
- Event(EventType t, const ConnectionId c, size_t size);
+ /** Create an event to mcast with a buffer of size bytes. */
+ Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0);
- /** Create an event from delivered data. */
- Event(const MemberId& m, const char* data, size_t size);
+ /** Create an event copied from delivered data. */
+ static Event delivered(const MemberId& m, void* data, size_t size);
void mcast(const Cpg::Name& name, Cpg& cpg);
@@ -51,6 +52,9 @@ struct Event {
ConnectionId getConnection() const { return connection; }
size_t getSize() const { return size; }
char* getData() { return data->get(); }
+ const char* getData() const { return data->get(); }
+
+ operator framing::Buffer() const { return framing::Buffer(const_cast<char*>(getData()), getSize()); }
private:
static const size_t OVERHEAD;
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h
index 74da2df750..1c7720f5c6 100644
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ b/cpp/src/qpid/cluster/PollableQueue.h
@@ -27,6 +27,7 @@
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
+#include <algorithm>
#include <deque>
namespace qpid {
@@ -53,6 +54,15 @@ class PollableQueue {
/** Callback to process a range of items. */
typedef boost::function<void (const iterator&, const iterator&)> Callback;
+ /** Functor tempalate to create a Callback from a functor that handles a single item. */
+ template <class F> struct ForEach {
+ F handleOne;
+ ForEach(const F& f) : handleOne(f) {}
+ void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
+ };
+ /** Function to create ForEach instances */
+ template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
+
/** When the queue is selected by the poller, values are passed to callback cb. */
explicit PollableQueue(const Callback& cb);
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index 8911896e1f..0cd6f1afbb 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -34,7 +34,7 @@ namespace cluster {
class Connection;
-/** Types of cluster messages. */
+/** Types of cluster event. */
enum EventType { DATA, CONTROL };
/** first=node-id, second=pid */
diff --git a/cpp/src/tests/benchmark b/cpp/src/tests/benchmark
index d0ad698f9f..c075837847 100755
--- a/cpp/src/tests/benchmark
+++ b/cpp/src/tests/benchmark
@@ -78,7 +78,6 @@ dosamples() {
} | tee $FILE
}
-echo "benchmark $*" | tee benchmark.tab
HEADING="pub sub total Mb"
dosamples $SCRIPTDIR/perfdist --size $SIZE --count $COUNT --nsubs $NSUBS --npubs $NPUBS -s -- ${CLIENTS[*]} --- ${BROKERS[*]}
HEADING="pub"
@@ -89,6 +88,8 @@ HEADING="min max avg"
dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s -b ${BROKERS[0]}
echo
-echo "Tab separated spreadsheet (also stored in benchmark.tab):"
+echo "Tab separated spreadsheet (also saved as benchmark.tab):"
echo
+
+echo "benchmark -- ${CLIENTS[*]} --- ${BROKERS[*]} " | tee benchmark.tab
paste $FILES | tee -a benchmark.tab