summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-24 19:48:54 +0000
committerAlan Conway <aconway@apache.org>2009-02-24 19:48:54 +0000
commit5996f46bccf1c0fa6bda145566d11b01064ef6dd (patch)
tree61cee350c55444ffb2ab02262c50fb2699037e7f /cpp
parent338297ff8c2c65a4226f3bc3fdd4da49269cfc9a (diff)
downloadqpid-python-5996f46bccf1c0fa6bda145566d11b01064ef6dd.tar.gz
Fixed issue with producer flow control in a cluster.
Producer flow control uses a Timer and other clock-based calculations to send flow control commands. These commands are not predictably ordered from the clusters point of view. Added getClusterOrderProxy() to SessionState. In a cluster it returns a proxy that defers sending a command to the client until it is multicast to the cluster. In a stand alone broker it is just the normal proxy. Updated producer flow control to use this proxy. Cluster flow control is turned off in shadow connections. Only the directly connected node does flow control calculations and multicasts the commands to send. All nodes sending of the commands thru SessionState to ensure consistent session state (e.g. command numbering.) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747528 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/cluster.mk1
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h21
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp3
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h18
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionState.h11
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp22
-rw-r--r--cpp/src/qpid/cluster/Connection.h7
-rw-r--r--cpp/src/qpid/cluster/Event.cpp14
-rw-r--r--cpp/src/qpid/cluster/Event.h7
-rw-r--r--cpp/src/qpid/cluster/EventFrame.cpp4
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h3
-rw-r--r--cpp/src/qpid/cluster/McastFrameHandler.h46
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp6
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h1
-rw-r--r--cpp/src/qpid/cluster/types.h2
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h2
-rwxr-xr-xcpp/src/tests/federated_cluster_test4
-rwxr-xr-xcpp/src/tests/ssl_test3
-rwxr-xr-xcpp/src/tests/start_cluster2
20 files changed, 163 insertions, 31 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 0d34788d2e..55355d2077 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -70,6 +70,7 @@ cluster_la_SOURCES = \
qpid/cluster/FailoverExchange.h \
qpid/cluster/Multicaster.cpp \
qpid/cluster/Multicaster.h \
+ qpid/cluster/MulticastFrameHandler.h \
qpid/cluster/NoOpConnectionOutputHandler.h \
qpid/cluster/OutputInterceptor.cpp \
qpid/cluster/OutputInterceptor.h \
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 0e9d211b56..0d7fbc5b3b 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -48,8 +48,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable
heartbeatmax(120),
stagingThreshold(broker.getStagingThreshold()),
federationLink(true),
- clientSupportsThrottling(false)
- {}
+ clientSupportsThrottling(false),
+ clusterOrderOut(0)
+ {}
virtual ~ConnectionState () {}
@@ -75,7 +76,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
const string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
- void setClientThrottling() { clientSupportsThrottling = true; }
+ void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
bool getClientThrottling() const { return clientSupportsThrottling; }
Broker& getBroker() { return broker; }
@@ -86,11 +87,20 @@ class ConnectionState : public ConnectionToken, public management::Manageable
//contained output tasks
sys::AggregateOutput outputTasks;
- sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
+ sys::ConnectionOutputHandler& getOutput() { return out; }
framing::ProtocolVersion getVersion() const { return version; }
-
void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
+ /**
+ * If the broker is part of a cluster, this is a handler provided
+ * by cluster code. It ensures consistent ordering of commands
+ * that are sent based on criteria that are not predictably
+ * ordered cluster-wide, e.g. a timer firing.
+ */
+ framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
+ void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; }
+
+
protected:
framing::ProtocolVersion version;
uint32_t framemax;
@@ -103,6 +113,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
string federationPeerTag;
std::vector<Url> knownHosts;
bool clientSupportsThrottling;
+ framing::FrameHandler* clusterOrderOut;
};
}}
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 2c4de478f6..5bdc1e2500 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -34,7 +34,8 @@ using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
+ proxy(out),
+ clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 7449db1560..698e4f397f 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -54,6 +54,17 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy() {
+ return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
+ }
+
virtual void handleDetach();
// Overrides
@@ -69,9 +80,16 @@ class SessionHandler : public amqp_0_10::SessionHandler {
virtual void readyToSend();
private:
+ struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.
+ framing::ChannelHandler setChannel;
+ SetChannelProxy(uint16_t ch, framing::FrameHandler* out)
+ : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
+ };
+
Connection& connection;
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
+ std::auto_ptr<SetChannelProxy> clusterOrderProxy;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index dffc7cf6af..b64fc20787 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -66,7 +66,7 @@ SessionState::SessionState(
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
} else {
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
@@ -210,7 +210,6 @@ struct ScheduledCreditTask : public TimerTask {
{}
void fire() {
- QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
if ( !sessionState.processSendCredit(0) ) {
@@ -275,7 +274,8 @@ bool SessionState::processSendCredit(uint32_t msgs)
if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
QPID_LOG(warning, getId() << ": producer throttling violation");
// TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().stop("");
return true;
}
AbsTime now = AbsTime::now();
@@ -283,7 +283,7 @@ bool SessionState::processSendCredit(uint32_t msgs)
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
if ( sendCredit>0 ) {
QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
+ getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
rateFlowcontrol->sentCredit(now, sendCredit);
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
@@ -364,8 +364,9 @@ void SessionState::readyToSend() {
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, credit);
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().setFlowMode("", 0);
+ getClusterOrderProxy().getMessage().flow("", 0, credit);
rateFlowcontrol->sentCredit(AbsTime::now(), credit);
if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
@@ -373,4 +374,8 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+ return handler->getClusterOrderProxy();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index c435a741f8..b64461eb86 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -125,6 +125,15 @@ class SessionState : public qpid::SessionState,
void sendAcceptAndCompletion();
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy();
+
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
@@ -138,7 +147,7 @@ class SessionState : public qpid::SessionState,
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
- RateFlowcontrol* rateFlowcontrol;
+ boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
friend class SessionManager;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 1a3f7c4ef7..9c2b4f1638 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,7 +62,8 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
+ connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+ mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
// Local connections
@@ -70,15 +71,20 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink)
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
cluster.addLocalConnection(this);
giveReadCredit(cluster.getReadMax());
}
+ else { // Shadow or catch-up connection
+ connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
+ connection.setClientThrottling(false); // Disable client throttling, done by active node.
+ }
}
void Connection::giveReadCredit(int credit) {
@@ -143,7 +149,15 @@ void Connection::deliveredFrame(const EventFrame& f) {
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
+ // FIXME aconway 2009-02-24: Using the DATA/CONTROL
+ // distinction to distinguish incoming vs. outgoing frames is
+ // very unclear.
+ if (f.type == DATA) // incoming data frames to broker::Connection
+ connection.received(const_cast<AMQFrame&>(f.frame));
+ else { // outgoing data frame, send via SessionState
+ broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+ }
}
giveReadCredit(f.readCredit);
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 98b47e1bc0..cefea00262 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -27,6 +27,7 @@
#include "OutputInterceptor.h"
#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
+#include "McastFrameHandler.h"
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
@@ -150,6 +151,10 @@ class Connection :
void giveReadCredit(int credit);
private:
+ struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+ };
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
@@ -174,6 +179,8 @@ class Connection :
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
+ McastFrameHandler mcastFrameHandler;
+ NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index e30b961b3e..9fe5376bc5 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -74,14 +74,17 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
return e;
}
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
- framing::AMQFrame f(body);
+Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) {
Event e(CONTROL, cid, f.encodedSize());
Buffer buf(e);
f.encode(buf);
return e;
}
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
+ return control(framing::AMQFrame(body), cid);
+}
+
iovec Event::toIovec() {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
@@ -110,10 +113,13 @@ Event::operator Buffer() const {
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+std::ostream& operator << (std::ostream& o, EventType t) {
+ return o << EVENT_TYPE_NAMES[t];
+}
+
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
o << "[event " << e.getConnectionId() << "/" << e.getSequence()
- << " " << EVENT_TYPE_NAMES[e.getType()]
- << " " << e.getSize() << " bytes]";
+ << " " << e.getType() << " " << e.getSize() << " bytes]";
return o;
}
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index f1de248f89..1338ea7413 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -34,6 +34,7 @@ namespace qpid {
namespace framing {
class AMQBody;
+class AMQFrame;
class Buffer;
}
@@ -83,8 +84,11 @@ class Event : public EventHeader {
/** Create an event copied from delivered data. */
static Event decodeCopy(const MemberId& m, framing::Buffer&);
- /** Create an event containing a control */
+ /** Create a control event. */
static Event control(const framing::AMQBody&, const ConnectionId&);
+
+ /** Create a control event. */
+ static Event control(const framing::AMQFrame&, const ConnectionId&);
// Data excluding header.
char* getData() { return store + HEADER_SIZE; }
@@ -105,6 +109,7 @@ class Event : public EventHeader {
};
std::ostream& operator << (std::ostream&, const EventHeader&);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_EVENT_H*/
diff --git a/cpp/src/qpid/cluster/EventFrame.cpp b/cpp/src/qpid/cluster/EventFrame.cpp
index ba01c170dd..48c9eab958 100644
--- a/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/cpp/src/qpid/cluster/EventFrame.cpp
@@ -27,13 +27,13 @@ namespace cluster {
EventFrame::EventFrame() : sequence(0) {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
- : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc)
+ : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc), type(e.getType())
{
QPID_LATENCY_INIT(frame);
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
+ return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit << " type=" << e.type;
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index 7f33cedb5b..ef3c38658b 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -57,7 +57,8 @@ struct EventFrame
ConnectionId connectionId;
framing::AMQFrame frame;
uint64_t sequence;
- int readCredit; // last frame in an event, give credit when processed.
+ int readCredit; ///< last frame in an event, give credit when processed.
+ EventType type;
};
std::ostream& operator<<(std::ostream& o, const EventFrame& e);
diff --git a/cpp/src/qpid/cluster/McastFrameHandler.h b/cpp/src/qpid/cluster/McastFrameHandler.h
new file mode 100644
index 0000000000..5127c31c84
--- /dev/null
+++ b/cpp/src/qpid/cluster/McastFrameHandler.h
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_MCASTFRAMEHANDLER_H
+#define QPID_CLUSTER_MCASTFRAMEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/FrameHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A frame handler that multicasts frames as CONTROL events.
+ */
+class McastFrameHandler : public framing::FrameHandler
+{
+ public:
+ McastFrameHandler(Multicaster& m, const ConnectionId& cid) : mcast(m), connection(cid) {}
+ void handle(framing::AMQFrame& frame) { mcast.mcastControl(frame, connection); }
+ private:
+ Multicaster& mcast;
+ ConnectionId connection;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MCASTFRAMEHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 239b3f5f35..f0738ab08f 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace cluster {
@@ -43,6 +44,11 @@ void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId&
mcast(Event::control(body, id));
}
+void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) {
+ QPID_LOG(trace, "MCAST " << id << ": " << frame);
+ mcast(Event::control(frame, id));
+}
+
void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
Event e(DATA, id, size);
memcpy(e.getData(), data, size);
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index 1dfee47bd5..d1c3115977 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -50,6 +50,7 @@ class Multicaster
boost::function<void()> onError
);
void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
+ void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
/** End holding mode, held events are mcast */
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index d1d6fdc427..30454d9fbb 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -78,6 +78,8 @@ struct ConnectionId : public std::pair<MemberId, Connection*> {
std::ostream& operator<<(std::ostream&, const ConnectionId&);
+std::ostream& operator << (std::ostream&, EventType);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_TYPES_H*/
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
index df6de89982..32809d86a1 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -30,7 +30,7 @@ namespace sys {
/**
* A ConnectionOutputHandler that delegates to another
* ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler
- * to be changed modified without updating all the pointers/references
+ * to be changed without updating all the pointers/references
* using the ConnectionOutputHandlerPtr
*/
class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
diff --git a/cpp/src/tests/federated_cluster_test b/cpp/src/tests/federated_cluster_test
index ef0db4cb8e..8c2117fb8c 100755
--- a/cpp/src/tests/federated_cluster_test
+++ b/cpp/src/tests/federated_cluster_test
@@ -22,7 +22,7 @@
# Test reliability of the replication feature in the face of link
# failures:
srcdir=`dirname $0`
-PYTHON_DIR=${srcdir}/../../../python
+PYTHON_DIR=$srcdir/../../../python
trap stop_brokers EXIT
@@ -37,7 +37,7 @@ stop_brokers() {
unset BROKER_A
fi
if [[ $NODE_1 ]] ; then
- ./stop_cluster
+ $srcdir/stop_cluster
unset NODE_1
fi
}
diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test
index c7b59b62ef..13965f3a03 100755
--- a/cpp/src/tests/ssl_test
+++ b/cpp/src/tests/ssl_test
@@ -39,8 +39,7 @@ create_certs() {
}
start_broker() {
- ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
- PORT=`cat qpidd.port`
+ PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE`
}
stop_broker() {
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index 4f0516500c..48e299d942 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -32,7 +32,7 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
rm -f cluster*.log
SIZE=${1:-1}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@"
for (( i=0; i<SIZE; ++i )); do
PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1