summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp48
1 files changed, 25 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 4d54a837ca..2b12e4f54a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -65,12 +65,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
- )
+ ),
+ deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
cpg.join(name);
- // Start dispatching from the poller.
+
+ deliverQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
@@ -173,27 +175,7 @@ void Cluster::deliver(
{
try {
MemberId from(nodeid, pid);
- Buffer buf(static_cast<char*>(msg), msg_len);
- Connection* connection;
- uint8_t type = buf.getOctet();
- decodePtr(buf, connection);
- if (connection == 0) { // Cluster controls
- AMQFrame frame;
- while (frame.decode(buf))
- if (!ClusterOperations(*this, from).invoke(frame))
- throw Exception("Invalid cluster control");
- }
- else { // Connection data or control
- boost::intrusive_ptr<Connection> c =
- getConnection(ConnectionId(from, connection));
- if (type == DATA)
- c->deliverBuffer(buf);
- else {
- AMQFrame frame;
- while (frame.decode(buf))
- c->deliver(frame);
- }
- }
+ deliverQueue.push(Event::delivered(from, msg, msg_len));
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -203,6 +185,26 @@ void Cluster::deliver(
}
}
+void Cluster::deliverEvent(const Event& e) {
+ Buffer buf(e);
+ if (e.getConnection().getConnectionPtr() == 0) { // Cluster control
+ AMQFrame frame;
+ while (frame.decode(buf))
+ if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame))
+ throw Exception("Invalid cluster control");
+ }
+ else { // Connection data or control
+ boost::intrusive_ptr<Connection> c = getConnection(e.getConnection());
+ if (e.getType() == DATA)
+ c->deliverBuffer(buf);
+ else { // control
+ AMQFrame frame;
+ while (frame.decode(buf))
+ c->deliver(frame);
+ }
+ }
+}
+
struct AddrList {
const cpg_address* addrs;
int count;