summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-03 21:28:14 +0000
committerAlan Conway <aconway@apache.org>2009-02-03 21:28:14 +0000
commitd71457233eb57af17dea2e5d1dc56fdc4497da6a (patch)
tree9405c253055e2a2eb26400a50b203df160a988c0 /qpid/cpp
parent788154168e3e243953c397b06ca3f4b25e4330da (diff)
downloadqpid-python-d71457233eb57af17dea2e5d1dc56fdc4497da6a.tar.gz
Fix for race conditions in cluster join.
- ConnectionDecoder: separated from Connection. - cluster/PollableQueue: stop processing frames if PollableQueue is stopped. - move state checks in event-queue handler to frame-queue handler. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@740459 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/cluster.mk8
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp124
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h23
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h56
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp33
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp49
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionDecoder.h60
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionMap.cpp86
-rw-r--r--qpid/cpp/src/qpid/cluster/ConnectionMap.h70
-rw-r--r--qpid/cpp/src/qpid/cluster/Decoder.cpp46
-rw-r--r--qpid/cpp/src/qpid/cluster/Decoder.h62
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.h14
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h68
-rw-r--r--qpid/cpp/src/qpid/cluster/Quorum_cman.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/types.h4
-rw-r--r--qpid/cpp/src/tests/ForkedBroker.h1
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp4
-rw-r--r--qpid/cpp/xml/cluster.xml6
25 files changed, 521 insertions, 233 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 9c76bb2239..f222b1e148 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -40,7 +40,7 @@ cluster_la_SOURCES = \
$(CMAN_SOURCES) \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
- qpid/cluster/ClusterQueueHandler.h \
+ qpid/cluster/PollableQueue.h \
qpid/cluster/ClusterMap.cpp \
qpid/cluster/ClusterMap.h \
qpid/cluster/ClusterPlugin.cpp \
@@ -49,8 +49,13 @@ cluster_la_SOURCES = \
qpid/cluster/ConnectionCodec.cpp \
qpid/cluster/ConnectionCodec.h \
qpid/cluster/ConnectionMap.h \
+ qpid/cluster/ConnectionMap.cpp \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
+ qpid/cluster/Decoder.cpp \
+ qpid/cluster/Decoder.h \
+ qpid/cluster/ConnectionDecoder.cpp \
+ qpid/cluster/ConnectionDecoder.h \
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
@@ -71,6 +76,7 @@ cluster_la_SOURCES = \
qpid/cluster/ThreadDispatch.h \
qpid/cluster/ProxyInputHandler.h \
qpid/cluster/Quorum.h \
+ qpid/cluster/Updatee.h \
qpid/cluster/WriteEstimate.cpp \
qpid/cluster/WriteEstimate.h \
qpid/cluster/types.h
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 6a19b8e4ea..eaa4a720b1 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -20,7 +20,6 @@
#include "Connection.h"
#include "UpdateClient.h"
#include "FailoverExchange.h"
-#include "ClusterQueueHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
@@ -92,8 +91,16 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
writeEstimate(writeEstimate_),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
- deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller),
- deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),
+ deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
+ boost::bind(&Cluster::leave, this),
+ "Error decoding events",
+ poller),
+ deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1),
+ boost::bind(&Cluster::leave, this),
+ "Error delivering frames",
+ poller),
+ connections(*this),
+ decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
state(INIT),
lastSize(0),
lastBroker(false),
@@ -121,12 +128,23 @@ Cluster::~Cluster() {
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- connections.insert(c->getId(), c);
+// Called in connection thread to insert a client connection.
+void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
+ Lock l(lock);
+ connections.insert(c);
}
-void Cluster::erase(ConnectionId id) {
+// Called in connection thread to insert an updated shadow connection.
+void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
+ Lock l(lock);
+ assert(state <= UPDATEE); // Only during update.
+ connections.insert(c);
+}
+
+void Cluster::erase(const ConnectionId& id) {
+ // Called only by Connection::deliverClose in deliver thread, no need to lock.
connections.erase(id);
+ decoder.erase(id);
}
std::vector<string> Cluster::getIds() const {
@@ -168,17 +186,7 @@ void Cluster::leave(Lock&) {
}
}
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId) {
- boost::intrusive_ptr<Connection> cp = connections.find(connectionId);
- if (!cp && connectionId.getMember() != myId) { // New shadow connection
- std::ostringstream mgmtId;
- mgmtId << name << ":" << connectionId;
- cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId);
- connections.insert(connectionId, cp);
- }
- return cp;
-}
-
+// Deliver CPG message.
void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
@@ -187,58 +195,52 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
- Mutex::ScopedLock l(lock);
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
e.setSequence(sequence++);
if (from == myId) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
- deliver(e, l);
+ deliver(e);
}
-void Cluster::deliver(const Event& e, Lock&) {
+void Cluster::deliver(const Event& e) {
if (state == LEFT) return;
QPID_LATENCY_INIT(e);
deliverEventQueue.push(e);
}
-// Entry point: called when deliverEventQueue has events to process.
+// Handler for deliverEventQueue
void Cluster::deliveredEvent(const Event& e) {
QPID_LATENCY_RECORD("delivered event queue", e);
Buffer buf(const_cast<char*>(e.getData()), e.getSize());
- boost::intrusive_ptr<Connection> connection;
- if (e.isConnection()) {
- if (state <= UPDATEE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- return;
- }
- connection = getConnection(e.getConnectionId());
- if (!connection) return;
- }
if (e.getType() == CONTROL) {
AMQFrame frame;
- while (frame.decode(buf)) {
- deliverFrameQueue.push(EventFrame(connection, e, frame));
- }
- }
- else if (e.getType() == DATA) {
- connection->deliveredEvent(e, deliverFrameQueue);
+ while (frame.decode(buf))
+ deliverFrameQueue.push(EventFrame(e, frame));
}
+ else if (e.getType() == DATA)
+ decoder.decode(e, e.getData());
}
+// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
+ Mutex::ScopedLock l(lock);
QPID_LOG(trace, *this << " DLVR: " << e);
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- if (e.connection) {
- e.connection->deliveredFrame(e);
- }
- else {
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
- ClusterDispatcher dispatch(*this, e.member, l);
+ if (e.isCluster()) { // Cluster control frame
+ ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
}
+ else { // Connection frame.
+ if (state <= UPDATEE) {
+ QPID_LOG(trace, *this << " DROP: " << e);
+ return;
+ }
+ boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ connection->deliveredFrame(e);
+ }
QPID_LATENCY_RECORD("processed", e.frame);
}
@@ -282,7 +284,13 @@ void Cluster::configChange (
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l);
+ deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+}
+
+void Cluster::setReady(Lock&) {
+ state = READY;
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ mcast.release();
}
void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
@@ -296,12 +304,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
- setClusterId(true);
- // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release()
- state = READY;
- mcast.release();
QPID_LOG(notice, *this << " first in cluster");
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+ setClusterId(true);
+ setReady(l);
map = ClusterMap(myId, myUrl, true);
memberUpdate(l);
}
@@ -325,9 +330,6 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
}
}
-
-
-
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -361,11 +363,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
if (state == CATCHUP && id == myId) {
- state = READY;
- mcast.release();
QPID_LOG(notice, *this << " caught up, active cluster member");
- if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- mcast.release();
+ setReady(l);
}
}
@@ -379,8 +378,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
updateStart(updatee, *url, l);
}
else { // Another offer was first.
- state = READY;
- mcast.release();
+ setReady(l);
QPID_LOG(info, *this << " cancelled update offer to " << updatee);
tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
@@ -390,7 +388,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
setClusterId(uuid);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- deliverEventQueue.stop();
+ deliverFrameQueue.stop();
checkUpdateIn(l);
}
}
@@ -400,7 +398,7 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
assert(state == OFFER);
state = UPDATER;
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- deliverEventQueue.stop();
+ deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
updateThread = Thread(
new UpdateClient(myId, updatee, url, broker, map, connections.values(),
@@ -422,7 +420,7 @@ void Cluster::checkUpdateIn(Lock& ) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
QPID_LOG(info, *this << " received update, starting catch-up");
- deliverEventQueue.start();
+ deliverFrameQueue.start();
}
}
@@ -432,11 +430,11 @@ void Cluster::updateOutDone() {
}
void Cluster::updateOutDone(Lock& l) {
+ QPID_LOG(info, *this << " sent update");
assert(state == UPDATER);
state = READY;
mcast.release();
- QPID_LOG(info, *this << " sent update");
- deliverEventQueue.start();
+ deliverFrameQueue.start();
tryMakeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -504,8 +502,6 @@ void Cluster::memberUpdate(Lock& l) {
}
lastSize = size;
- //
-
if (mgmtObject) {
mgmtObject->set_clusterSize(size);
string urlstr;
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 6e91ca8f64..1cfcd04c6f 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -29,9 +29,10 @@
#include "NoOpConnectionOutputHandler.h"
#include "PollerDispatch.h"
#include "Quorum.h"
+#include "Decoder.h"
+#include "PollableQueue.h"
#include "qpid/broker/Broker.h"
-#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
#include "qpid/Url.h"
@@ -73,8 +74,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
virtual ~Cluster();
// Connection map - called in connection threads.
- void insert(const ConnectionPtr&);
- void erase(ConnectionId);
+ void addLocalConnection(const ConnectionPtr&);
+ void addShadowConnection(const ConnectionPtr&);
+ void erase(const ConnectionId&);
// URLs of current cluster members - called in connection threads.
std::vector<std::string> getIds() const;
@@ -100,8 +102,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
private:
typedef sys::Monitor::ScopedLock Lock;
- typedef sys::PollableQueue<Event> PollableEventQueue;
- typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
+ typedef PollableQueue<Event> PollableEventQueue;
+ typedef PollableQueue<EventFrame> PollableFrameQueue;
// NB: The final Lock& parameter on functions below is used to mark functions
// that should only be called by a function that already holds the lock.
@@ -132,6 +134,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Helper, called in deliver thread.
void updateStart(const MemberId& updatee, const Url& url, Lock&);
+ void setReady(Lock&);
+
void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -140,7 +144,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void* /*msg*/,
int /*msg_len*/);
- void deliver(const Event& e, Lock&);
+ void deliver(const Event&);
void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
@@ -150,8 +154,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
struct cpg_address */*joined*/, int /*nJoined*/
);
- boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
-
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
@@ -193,7 +195,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
- // Remaining members are protected by lock.
+ // Called only from event delivery thread
+ Decoder decoder;
+
+ // Remaining members are protected by lock
mutable sys::Monitor lock;
// Local cluster state, cluster map
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index bcfade2b8c..9e7232180d 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -69,8 +69,7 @@ ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt)
std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
}
-ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
- framing::ClusterConnectionMembershipBody b;
+void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const {
b.getJoiners().clear();
std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1));
for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
@@ -79,7 +78,6 @@ ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
}
b.getMembers().clear();
std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
- return b;
}
bool ClusterMap::configChange(
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 9756daf977..4548441442 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -71,7 +71,7 @@ class ClusterMap {
MemberId firstJoiner() const;
/** Convert map contents to a cluster control body. */
- framing::ClusterConnectionMembershipBody asMethodBody() const;
+ void toMethodBody(framing::ClusterConnectionMembershipBody&) const;
size_t aliveCount() const { return alive.size(); }
size_t memberCount() const { return members.size(); }
diff --git a/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h b/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h
deleted file mode 100644
index e843526962..0000000000
--- a/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h
+++ /dev/null
@@ -1,56 +0,0 @@
-#ifndef QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
-#define QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Cluster.h"
-#include "qpid/sys/PollableQueue.h"
-#include <qpid/log/Statement.h>
-
-namespace qpid {
-namespace cluster {
-
-/** Convenience functor for PollableQueue callbacks. */
-template <class T> struct ClusterQueueHandler {
- ClusterQueueHandler(Cluster& c, boost::function<void (const T&)> f, const std::string& n) : cluster(c), callback(f), name(n) {}
- ClusterQueueHandler(const Cluster* c, boost::function<void (const T&)> f, const std::string& n) : cluster(*const_cast<Cluster*>(c)), callback(f), name(n) {}
-
- void operator()(typename sys::PollableQueue<T>::Queue& values) {
- try {
- std::for_each(values.begin(), values.end(), callback);
- values.clear();
- }
- catch (const std::exception& e) {
- QPID_LOG(error, "Error on " << name << ": " << e.what());
- cluster.leave();
- }
- }
-
- Cluster& cluster;
- boost::function<void (const T&)> callback;
- std::string name;
-};
-
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_CLUSTERQUEUEHANDLER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 9016e812be..a71950ef1d 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -75,7 +75,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocal() && !isCatchUp() && cluster.getReadMax()) {
+ if (isLocalClient()) {
+ cluster.addLocalConnection(this);
+ if (cluster.getReadMax())
output.giveReadCredit(cluster.getReadMax());
}
}
@@ -99,17 +101,15 @@ void Connection::deliverDoOutput(uint32_t requested) {
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
- if (isLocal()) {
+ if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
}
- else { // Shadow or updated ex catch-up connection.
+ else { // Shadow or updated catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
- if (isShadow()) {
- QPID_LOG(debug, cluster << " inserting connection " << *this);
- cluster.insert(boost::intrusive_ptr<Connection>(this));
- }
+ if (isShadow())
+ cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
output.setOutputHandler(discardHandler);
@@ -136,24 +136,7 @@ bool Connection::checkUnsupported(const AMQBody& body) {
return !message.empty();
}
-// Decode buffer and put frames on frameq.
-void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) {
- assert(!catchUp);
- Buffer buf(e);
- // Set read credit on the last frame.
- ++readCredit; // One credit per buffer.
- if (!mcastDecoder.decode(buf)) return;
- AMQFrame frame(mcastDecoder.frame);
- while (mcastDecoder.decode(buf)) {
- frameq.push(EventFrame(this, e, frame));
- frame = mcastDecoder.frame;
- }
- frameq.push(EventFrame(this, e, frame, readCredit));
- readCredit = 0;
-}
-
-
-// Delivered from cluster.
+// Called in delivery thread, in cluster order.
void Connection::deliveredFrame(const EventFrame& f) {
assert(!catchUp);
currentChannel = f.frame.getChannel();
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index e22ff05c08..160855dc2d 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -72,9 +72,11 @@ class Connection :
ConnectionId getId() const { return self; }
broker::Connection& getBrokerConnection() { return connection; }
- /** True for connections from direct clients of local broker */
+ /** Local connections may be clients or catch-up connections */
bool isLocal() const;
+ bool isLocalClient() const { return isLocal() && !isCatchUp(); }
+
/** True for connections that are shadowing remote broker connections */
bool isShadow() const;
@@ -101,7 +103,6 @@ class Connection :
size_t decode(const char* buffer, size_t size);
// Called for data delivered from the cluster.
- void deliveredEvent(const Event&, PollableFrameQueue&);
void deliveredFrame(const EventFrame&);
void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
@@ -166,7 +167,6 @@ class Connection :
WriteEstimate writeEstimate;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
- framing::FrameDecoder mcastDecoder;
broker::Connection connection;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 1334f97eec..442ac1438f 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -59,8 +59,6 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id,
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
- if (!catchUp) // Don't put catchUp connections in the cluster map.
- cluster.insert(interceptor);
}
ConnectionCodec::~ConnectionCodec() {}
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
new file mode 100644
index 0000000000..1500b6a743
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionDecoder.h"
+#include "EventFrame.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h), readCredit(0) {}
+
+void ConnectionDecoder::decode(const EventHeader& eh, const void* data) {
+ assert(eh.getType() == DATA); // Only handle connection data events.
+ const char* cp = static_cast<const char*>(data);
+ Buffer buf(const_cast<char*>(cp), eh.getSize());
+ // Set read credit on the last frame in the event.
+ ++readCredit; // One credit per event = connection read buffer.
+ if (decoder.decode(buf)) { // Decoded a frame
+ AMQFrame frame(decoder.frame);
+ while (decoder.decode(buf)) {
+ handler(EventFrame(eh, frame));
+ frame = decoder.frame;
+ }
+ handler(EventFrame(eh, frame, readCredit));
+ readCredit = 0; // Reset credit for next event.
+ }
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
new file mode 100644
index 0000000000..5f139b23e9
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
@@ -0,0 +1,60 @@
+#ifndef QPID_CLUSTER_CONNECTIONDECODER_H
+#define QPID_CLUSTER_CONNECTIONDECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameDecoder.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+class EventHeader;
+class EventFrame;
+/**
+ * Decodes delivered connection data Event's as EventFrame's for a
+ * connection replica, local or shadow. Manages state for frame
+ * fragments and flow control.
+ *
+ * THREAD UNSAFE: connection events are decoded in sequence.
+ */
+class ConnectionDecoder
+{
+ public:
+ typedef boost::function<void(const EventFrame&)> Handler;
+
+ ConnectionDecoder(const Handler& h);
+
+ /** Takes EventHeader + data rather than Event so that the caller can
+ * pass a pointer to connection data or a CPG buffer directly without copy.
+ */
+ void decode(const EventHeader& eh, const void* data);
+
+ private:
+ Handler handler;
+ framing::FrameDecoder decoder;
+ int readCredit;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CONNECTIONDECODER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
new file mode 100644
index 0000000000..9dc6210666
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionMap.h"
+#include "Cluster.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using framing::InternalErrorException;
+
+void ConnectionMap::insert(ConnectionPtr p) {
+ std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(p->getId(), p));
+ if (!ib.second) {
+ assert(0);
+ throw InternalErrorException(QPID_MSG("Duplicate connection replica: " << p->getId()));
+ }
+}
+
+void ConnectionMap::erase(const ConnectionId& id) {
+ Map::iterator i = map.find(id);
+ if (i == map.end()) {
+ assert(0);
+ QPID_LOG(warning, "Erase non-existent connection replica: " << id);
+ }
+ map.erase(i);
+}
+
+ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) {
+ Map::const_iterator i = map.find(id);
+ if (i == map.end()) {
+ assert(id.getMember() != cluster.getId());
+ // New remote connection, create a shadow.
+ std::ostringstream mgmtId;
+ mgmtId << id;
+ ConnectionPtr cp = new Connection(cluster, shadowOut, mgmtId.str(), id);
+ std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(id, cp));
+ assert(ib.second); // FIXME aconway 2009-02-03: exception.
+ i = ib.first;
+ }
+ return i->second;
+}
+
+ConnectionMap::Vector ConnectionMap::values() const {
+ Vector result(map.size());
+ std::transform(map.begin(), map.end(), result.begin(),
+ boost::bind(&Map::value_type::second, _1));
+ return result;
+}
+
+void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) {
+ for (Map::iterator i = map.begin(); i != map.end(); ) {
+ MemberId member = i->first.getMember();
+ if (member != myId && !cluster.isMember(member)) {
+ i->second->left();
+ map.erase(i++);
+ } else {
+ i++;
+ }
+ }
+}
+
+void ConnectionMap::clear() {
+ map.clear();
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.h b/qpid/cpp/src/qpid/cluster/ConnectionMap.h
index c355074e75..23084796cf 100644
--- a/qpid/cpp/src/qpid/cluster/ConnectionMap.h
+++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "Connection.h"
#include "ClusterMap.h"
+#include "NoOpConnectionOutputHandler.h"
#include "qpid/sys/Mutex.h"
#include <boost/intrusive_ptr.hpp>
#include <map>
@@ -31,61 +32,48 @@
namespace qpid {
namespace cluster {
+class Cluster;
+
/**
- * Thread safe map of connections.
+ * Thread safe map of connections. The map is used in:
+ * - deliver thread to look connections and create new shadow connections.
+ * - local catch-up connection threads to add a caught-up shadow connections.
+ * - local client connection threads when local connections are created.
*/
-class ConnectionMap
-{
+class ConnectionMap {
public:
typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> Vector;
- void insert(ConnectionId id, ConnectionPtr p) {
- ScopedLock l(lock);
- map.insert(Map::value_type(id,p));
- }
+ ConnectionMap(Cluster& c) : cluster(c) {}
+
+ /** Insert a local connection or a caught up shadow connection.
+ * Called in local connection thread.
+ */
+ void insert(ConnectionPtr p);
+
+ /** Erase a closed connection. Called in deliver thread. */
+ void erase(const ConnectionId& id);
- void erase(ConnectionId id) {
- ScopedLock l(lock);
- map.erase(id);
- }
+ /** Get an existing connection. */
+ ConnectionPtr get(const ConnectionId& id);
- ConnectionPtr find(ConnectionId id) const {
- ScopedLock l(lock);
- Map::const_iterator i = map.find(id);
- return i == map.end() ? ConnectionPtr() : i->second;
- }
+ /** Get connections for sending an update. */
+ Vector values() const;
- Vector values() const {
- Vector result(map.size());
- std::transform(map.begin(), map.end(), result.begin(),
- boost::bind(&Map::value_type::second, _1));
- return result;
- }
+ /** Remove connections who's members are no longer in the cluster. Deliver thread. */
+ void update(MemberId myId, const ClusterMap& cluster);
- void update(MemberId myId, const ClusterMap& cluster) {
- for (Map::iterator i = map.begin(); i != map.end(); ) {
- MemberId member = i->first.getMember();
- if (member != myId && !cluster.isMember(member)) {
- i->second->left();
- map.erase(i++);
- } else {
- i++;
- }
- }
- }
+
+ void clear();
- void clear() {
- ScopedLock l(lock);
- map.clear();
- }
+ size_t size() const;
- size_t size() const { return map.size(); }
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
- typedef sys::Mutex::ScopedLock ScopedLock;
-
- mutable sys::Mutex lock;
+
+ Cluster& cluster;
+ NoOpConnectionOutputHandler shadowOut;
Map map;
};
diff --git a/qpid/cpp/src/qpid/cluster/Decoder.cpp b/qpid/cpp/src/qpid/cluster/Decoder.cpp
new file mode 100644
index 0000000000..b2ab7c8d0f
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/Decoder.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Decoder.h"
+#include "Event.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/ptr_map.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+Decoder::Decoder(const Handler& h) : handler(h) {}
+
+void Decoder::decode(const EventHeader& eh, const void* data) {
+ ConnectionId id = eh.getConnectionId();
+ std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler));
+ ptr_map_ptr(ib.first)->decode(eh, data);
+}
+
+void Decoder::erase(const ConnectionId& c) {
+ Map::iterator i = map.find(c);
+ if (i != map.end()) // FIXME aconway 2009-02-03:
+ map.erase(i);
+}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Decoder.h b/qpid/cpp/src/qpid/cluster/Decoder.h
new file mode 100644
index 0000000000..dffd6c8f75
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/Decoder.h
@@ -0,0 +1,62 @@
+#ifndef QPID_CLUSTER_DECODER_H
+#define QPID_CLUSTER_DECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionDecoder.h"
+#include "types.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace cluster {
+
+class EventHeader;
+
+/**
+ * Holds a map of ConnectionDecoders. Decodes Events into EventFrames
+ * and forwards EventFrames to a handler.
+ *
+ * THREAD UNSAFE: Called sequentially with un-decoded cluster events from CPG.
+ */
+class Decoder
+{
+ public:
+ typedef boost::function<void(const EventFrame&)> Handler;
+
+ Decoder(const Handler& h);
+
+ /** Takes EventHeader + data rather than Event so that the caller can
+ * pass a pointer to connection data or a CPG buffer directly without copy.
+ */
+ void decode(const EventHeader& eh, const void* data);
+
+ /** Erase the decoder for a connection. */
+ void erase(const ConnectionId&);
+
+ private:
+ typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map;
+ Handler handler;
+ Map map;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_DECODER_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index 5df0c96f77..f1de248f89 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -49,7 +49,12 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
EventType getType() const { return type; }
ConnectionId getConnectionId() const { return connectionId; }
MemberId getMemberId() const { return connectionId.getMember(); }
+
+ /** Size of payload data, excluding header. */
size_t getSize() const { return size; }
+ /** Size of header + payload. */
+ size_t getStoreSize() { return size + HEADER_SIZE; }
+
uint64_t getSequence() const { return sequence; }
void setSequence(uint64_t n) { sequence = n; }
@@ -88,7 +93,6 @@ class Event : public EventHeader {
// Store including header
char* getStore() { return store; }
const char* getStore() const { return store; }
- size_t getStoreSize() { return size + HEADER_SIZE; }
operator framing::Buffer() const;
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
index c1f96ad1b2..ba01c170dd 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
@@ -26,21 +26,14 @@ namespace cluster {
EventFrame::EventFrame() : sequence(0) {}
-EventFrame::EventFrame(
- const boost::intrusive_ptr<Connection>& c, const Event& e,
- const framing::AMQFrame& f, int rc
-) : connection(c), member(e.getMemberId()), frame(f),
- sequence(e.getSequence()), readCredit(rc)
+EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
+ : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc)
{
QPID_LATENCY_INIT(frame);
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- if (e.connection)
- o << e.connection->getId();
- else
- o << e.member;
- return o << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
+ return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h
index 2ef33b9695..7f33cedb5b 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.h
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.h
@@ -32,22 +32,21 @@
namespace qpid {
namespace cluster {
-class Connection;
-
/**
* A frame decoded from an Event.
*/
struct EventFrame
{
+ public:
EventFrame();
- EventFrame(const boost::intrusive_ptr<Connection>& c, const Event& e,
- const framing::AMQFrame& f, int rc=0);
+ EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0);
- bool isCluster() const { return !connection; }
- bool isConnection() const { return connection; }
+ bool isCluster() const { return !connectionId.getPointer(); }
+ bool isConnection() const { return connectionId.getPointer(); }
bool isLastInEvent() const { return readCredit; }
+
// True if this frame follows immediately after frame e.
bool follows(const EventFrame& e) const {
return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit);
@@ -55,8 +54,7 @@ struct EventFrame
bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
- boost::intrusive_ptr<Connection> connection;
- MemberId member;
+ ConnectionId connectionId;
framing::AMQFrame frame;
uint64_t sequence;
int readCredit; // last frame in an event, give credit when processed.
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
new file mode 100644
index 0000000000..e0422e2449
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
+#define QPID_CLUSTER_POLLABLEQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PollableQueue.h"
+#include <qpid/log/Statement.h>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * More convenient version of PollableQueue that handles iterating
+ * over the batch and error handling.
+ */
+template <class T> class PollableQueue : public sys::PollableQueue<T> {
+ public:
+ typedef boost::function<void (const T&)> Callback;
+ typedef boost::function<void()> ErrorCallback;
+
+ PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller)
+ : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller),
+ callback(f), error(err), message(msg) {}
+
+ void handleBatch(typename sys::PollableQueue<T>::Queue& values) {
+ try {
+ typename sys::PollableQueue<T>::Queue::iterator i = values.begin();
+ while (i != values.end() && !this->isStopped()) {
+ callback(*i);
+ ++i;
+ }
+ values.erase(values.begin(), i);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, message << ": " << e.what());
+ error();
+ }
+ }
+
+ private:
+ Callback callback;
+ ErrorCallback error;
+ std::string message;
+};
+
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/
diff --git a/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
index edce1698ee..62c014fcc4 100644
--- a/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
+++ b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -35,7 +35,7 @@ void Quorum::init() {
enable = true;
cman = cman_init(0);
if (cman == 0) throw ErrnoException("Can't connect to cman service");
- // FIXME aconway 2008-11-13: configurable max wait.
+ // TODO aconway 2008-11-13: configurable max wait.
for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) {
QPID_LOG(info, "Waiting for cluster quorum: " << sys::strError(errno));
sys::sleep(1);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 08f09573a4..91d4c6d3ce 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -117,7 +117,10 @@ void UpdateClient::update() {
session.close();
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
- AMQFrame frame(map.asMethodBody());
+
+ ClusterConnectionMembershipBody membership;
+ map.toMethodBody(membership);
+ AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h
index 0797d472b6..5e0d3d20e3 100644
--- a/qpid/cpp/src/qpid/cluster/types.h
+++ b/qpid/cpp/src/qpid/cluster/types.h
@@ -24,11 +24,10 @@
#include "config.h"
#include "qpid/Url.h"
-
+#include <boost/intrusive_ptr.hpp>
#include <utility>
#include <iosfwd>
#include <string>
-
#include <stdint.h>
extern "C" {
@@ -45,6 +44,7 @@ namespace qpid {
namespace cluster {
class Connection;
+typedef boost::intrusive_ptr<Connection> ConnectionPtr;
/** Types of cluster event. */
enum EventType { DATA, CONTROL };
diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h
index bf9e9265c4..925f8011f2 100644
--- a/qpid/cpp/src/tests/ForkedBroker.h
+++ b/qpid/cpp/src/tests/ForkedBroker.h
@@ -108,6 +108,7 @@ class ForkedBroker {
std::vector<const char*> args2(args);
args2.push_back("--port=0");
args2.push_back("--mgmt-enable=no"); // TODO aconway 2008-07-16: why does mgmt cause problems?
+ if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE"))
args2.push_back("--log-enable=error+"); // Keep quiet except for errors.
args2.push_back(0);
execv(prog, const_cast<char* const*>(&args2[0]));
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index b7d28bf914..585c981afc 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -219,7 +219,7 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
FieldTable args;
- args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"??
+ args.setInt("qpid.msg_sequence", 1);
c0.session.queueDeclare(arg::queue="q");
c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
@@ -452,7 +452,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
BOOST_CHECK_EQUAL(kb0, kb2);
}
-QPID_AUTO_TEST_CASE(UpdateConsumers) {
+QPID_AUTO_TEST_CASE(testUpdateConsumers) {
ClusterFixture cluster(1, 1);
Client c0(cluster[0], "c0");
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index e6cacb0223..c114ef0151 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -36,7 +36,7 @@
<field name="cluster-id" type="uuid"/>
</control>
-Min <control name="ready" code="0x10" label="New member is ready.">
+ <control name="ready" code="0x10" label="New member is ready.">
<field name="url" type="str16"/>
</control>
@@ -45,6 +45,7 @@ Min <control name="ready" code="0x10" label="New member is ready.">
</control>
<control name="shutdown" code="0x20" label="Shut down entire cluster"/>
+
</class>
<!-- TODO aconway 2008-09-10: support for un-attached connections. -->
@@ -53,8 +54,7 @@ Min <control name="ready" code="0x10" label="New member is ready.">
<class name="cluster-connection" code="0x81" label="Qpid clustering extensions.">
- <control name="deliver-close" code="0x2">
- </control>
+ <control name="deliver-close" code="0x2"/>
<control name="deliver-do-output" code="0x3">
<field name="bytes" type="uint32"/>