summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk1
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h21
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp3
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h18
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionState.h11
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp22
-rw-r--r--cpp/src/qpid/cluster/Connection.h7
-rw-r--r--cpp/src/qpid/cluster/Event.cpp14
-rw-r--r--cpp/src/qpid/cluster/Event.h7
-rw-r--r--cpp/src/qpid/cluster/EventFrame.cpp4
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h3
-rw-r--r--cpp/src/qpid/cluster/McastFrameHandler.h46
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp6
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h1
-rw-r--r--cpp/src/qpid/cluster/types.h2
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h2
-rwxr-xr-xcpp/src/tests/federated_cluster_test4
-rwxr-xr-xcpp/src/tests/ssl_test3
-rwxr-xr-xcpp/src/tests/start_cluster2
20 files changed, 163 insertions, 31 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 0d34788d2e..55355d2077 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -70,6 +70,7 @@ cluster_la_SOURCES = \
qpid/cluster/FailoverExchange.h \
qpid/cluster/Multicaster.cpp \
qpid/cluster/Multicaster.h \
+ qpid/cluster/MulticastFrameHandler.h \
qpid/cluster/NoOpConnectionOutputHandler.h \
qpid/cluster/OutputInterceptor.cpp \
qpid/cluster/OutputInterceptor.h \
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 0e9d211b56..0d7fbc5b3b 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -48,8 +48,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable
heartbeatmax(120),
stagingThreshold(broker.getStagingThreshold()),
federationLink(true),
- clientSupportsThrottling(false)
- {}
+ clientSupportsThrottling(false),
+ clusterOrderOut(0)
+ {}
virtual ~ConnectionState () {}
@@ -75,7 +76,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
const string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
- void setClientThrottling() { clientSupportsThrottling = true; }
+ void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
bool getClientThrottling() const { return clientSupportsThrottling; }
Broker& getBroker() { return broker; }
@@ -86,11 +87,20 @@ class ConnectionState : public ConnectionToken, public management::Manageable
//contained output tasks
sys::AggregateOutput outputTasks;
- sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
+ sys::ConnectionOutputHandler& getOutput() { return out; }
framing::ProtocolVersion getVersion() const { return version; }
-
void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
+ /**
+ * If the broker is part of a cluster, this is a handler provided
+ * by cluster code. It ensures consistent ordering of commands
+ * that are sent based on criteria that are not predictably
+ * ordered cluster-wide, e.g. a timer firing.
+ */
+ framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
+ void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; }
+
+
protected:
framing::ProtocolVersion version;
uint32_t framemax;
@@ -103,6 +113,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
string federationPeerTag;
std::vector<Url> knownHosts;
bool clientSupportsThrottling;
+ framing::FrameHandler* clusterOrderOut;
};
}}
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 2c4de478f6..5bdc1e2500 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -34,7 +34,8 @@ using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
+ proxy(out),
+ clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 7449db1560..698e4f397f 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -54,6 +54,17 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy() {
+ return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
+ }
+
virtual void handleDetach();
// Overrides
@@ -69,9 +80,16 @@ class SessionHandler : public amqp_0_10::SessionHandler {
virtual void readyToSend();
private:
+ struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.
+ framing::ChannelHandler setChannel;
+ SetChannelProxy(uint16_t ch, framing::FrameHandler* out)
+ : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
+ };
+
Connection& connection;
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
+ std::auto_ptr<SetChannelProxy> clusterOrderProxy;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index dffc7cf6af..b64fc20787 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -66,7 +66,7 @@ SessionState::SessionState(
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
} else {
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
@@ -210,7 +210,6 @@ struct ScheduledCreditTask : public TimerTask {
{}
void fire() {
- QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
if ( !sessionState.processSendCredit(0) ) {
@@ -275,7 +274,8 @@ bool SessionState::processSendCredit(uint32_t msgs)
if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
QPID_LOG(warning, getId() << ": producer throttling violation");
// TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().stop("");
return true;
}
AbsTime now = AbsTime::now();
@@ -283,7 +283,7 @@ bool SessionState::processSendCredit(uint32_t msgs)
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
if ( sendCredit>0 ) {
QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
+ getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
rateFlowcontrol->sentCredit(now, sendCredit);
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
@@ -364,8 +364,9 @@ void SessionState::readyToSend() {
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, credit);
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().setFlowMode("", 0);
+ getClusterOrderProxy().getMessage().flow("", 0, credit);
rateFlowcontrol->sentCredit(AbsTime::now(), credit);
if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
@@ -373,4 +374,8 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+ return handler->getClusterOrderProxy();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index c435a741f8..b64461eb86 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -125,6 +125,15 @@ class SessionState : public qpid::SessionState,
void sendAcceptAndCompletion();
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy();
+
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
@@ -138,7 +147,7 @@ class SessionState : public qpid::SessionState,
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
- RateFlowcontrol* rateFlowcontrol;
+ boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
friend class SessionManager;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 1a3f7c4ef7..9c2b4f1638 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,7 +62,8 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
+ connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+ mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
// Local connections
@@ -70,15 +71,20 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink)
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
cluster.addLocalConnection(this);
giveReadCredit(cluster.getReadMax());
}
+ else { // Shadow or catch-up connection
+ connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
+ connection.setClientThrottling(false); // Disable client throttling, done by active node.
+ }
}
void Connection::giveReadCredit(int credit) {
@@ -143,7 +149,15 @@ void Connection::deliveredFrame(const EventFrame& f) {
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
+ // FIXME aconway 2009-02-24: Using the DATA/CONTROL
+ // distinction to distinguish incoming vs. outgoing frames is
+ // very unclear.
+ if (f.type == DATA) // incoming data frames to broker::Connection
+ connection.received(const_cast<AMQFrame&>(f.frame));
+ else { // outgoing data frame, send via SessionState
+ broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+ }
}
giveReadCredit(f.readCredit);
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 98b47e1bc0..cefea00262 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -27,6 +27,7 @@
#include "OutputInterceptor.h"
#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
+#include "McastFrameHandler.h"
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
@@ -150,6 +151,10 @@ class Connection :
void giveReadCredit(int credit);
private:
+ struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+ };
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
@@ -174,6 +179,8 @@ class Connection :
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
+ McastFrameHandler mcastFrameHandler;
+ NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index e30b961b3e..9fe5376bc5 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -74,14 +74,17 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
return e;
}
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
- framing::AMQFrame f(body);
+Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) {
Event e(CONTROL, cid, f.encodedSize());
Buffer buf(e);
f.encode(buf);
return e;
}
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
+ return control(framing::AMQFrame(body), cid);
+}
+
iovec Event::toIovec() {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
@@ -110,10 +113,13 @@ Event::operator Buffer() const {
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+std::ostream& operator << (std::ostream& o, EventType t) {
+ return o << EVENT_TYPE_NAMES[t];
+}
+
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
o << "[event " << e.getConnectionId() << "/" << e.getSequence()
- << " " << EVENT_TYPE_NAMES[e.getType()]
- << " " << e.getSize() << " bytes]";
+ << " " << e.getType() << " " << e.getSize() << " bytes]";
return o;
}
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index f1de248f89..1338ea7413 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -34,6 +34,7 @@ namespace qpid {
namespace framing {
class AMQBody;
+class AMQFrame;
class Buffer;
}
@@ -83,8 +84,11 @@ class Event : public EventHeader {
/** Create an event copied from delivered data. */
static Event decodeCopy(const MemberId& m, framing::Buffer&);
- /** Create an event containing a control */
+ /** Create a control event. */
static Event control(const framing::AMQBody&, const ConnectionId&);
+
+ /** Create a control event. */
+ static Event control(const framing::AMQFrame&, const ConnectionId&);
// Data excluding header.
char* getData() { return store + HEADER_SIZE; }
@@ -105,6 +109,7 @@ class Event : public EventHeader {
};
std::ostream& operator << (std::ostream&, const EventHeader&);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_EVENT_H*/
diff --git a/cpp/src/qpid/cluster/EventFrame.cpp b/cpp/src/qpid/cluster/EventFrame.cpp
index ba01c170dd..48c9eab958 100644
--- a/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/cpp/src/qpid/cluster/EventFrame.cpp
@@ -27,13 +27,13 @@ namespace cluster {
EventFrame::EventFrame() : sequence(0) {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
- : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc)
+ : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc), type(e.getType())
{
QPID_LATENCY_INIT(frame);
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
+ return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit << " type=" << e.type;
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index 7f33cedb5b..ef3c38658b 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -57,7 +57,8 @@ struct EventFrame
ConnectionId connectionId;
framing::AMQFrame frame;
uint64_t sequence;
- int readCredit; // last frame in an event, give credit when processed.
+ int readCredit; ///< last frame in an event, give credit when processed.
+ EventType type;
};
std::ostream& operator<<(std::ostream& o, const EventFrame& e);
diff --git a/cpp/src/qpid/cluster/McastFrameHandler.h b/cpp/src/qpid/cluster/McastFrameHandler.h
new file mode 100644
index 0000000000..5127c31c84
--- /dev/null
+++ b/cpp/src/qpid/cluster/McastFrameHandler.h
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_MCASTFRAMEHANDLER_H
+#define QPID_CLUSTER_MCASTFRAMEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/FrameHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A frame handler that multicasts frames as CONTROL events.
+ */
+class McastFrameHandler : public framing::FrameHandler
+{
+ public:
+ McastFrameHandler(Multicaster& m, const ConnectionId& cid) : mcast(m), connection(cid) {}
+ void handle(framing::AMQFrame& frame) { mcast.mcastControl(frame, connection); }
+ private:
+ Multicaster& mcast;
+ ConnectionId connection;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MCASTFRAMEHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 239b3f5f35..f0738ab08f 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace cluster {
@@ -43,6 +44,11 @@ void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId&
mcast(Event::control(body, id));
}
+void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) {
+ QPID_LOG(trace, "MCAST " << id << ": " << frame);
+ mcast(Event::control(frame, id));
+}
+
void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
Event e(DATA, id, size);
memcpy(e.getData(), data, size);
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index 1dfee47bd5..d1c3115977 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -50,6 +50,7 @@ class Multicaster
boost::function<void()> onError
);
void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
+ void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
/** End holding mode, held events are mcast */
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index d1d6fdc427..30454d9fbb 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -78,6 +78,8 @@ struct ConnectionId : public std::pair<MemberId, Connection*> {
std::ostream& operator<<(std::ostream&, const ConnectionId&);
+std::ostream& operator << (std::ostream&, EventType);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_TYPES_H*/
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
index df6de89982..32809d86a1 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -30,7 +30,7 @@ namespace sys {
/**
* A ConnectionOutputHandler that delegates to another
* ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler
- * to be changed modified without updating all the pointers/references
+ * to be changed without updating all the pointers/references
* using the ConnectionOutputHandlerPtr
*/
class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
diff --git a/cpp/src/tests/federated_cluster_test b/cpp/src/tests/federated_cluster_test
index ef0db4cb8e..8c2117fb8c 100755
--- a/cpp/src/tests/federated_cluster_test
+++ b/cpp/src/tests/federated_cluster_test
@@ -22,7 +22,7 @@
# Test reliability of the replication feature in the face of link
# failures:
srcdir=`dirname $0`
-PYTHON_DIR=${srcdir}/../../../python
+PYTHON_DIR=$srcdir/../../../python
trap stop_brokers EXIT
@@ -37,7 +37,7 @@ stop_brokers() {
unset BROKER_A
fi
if [[ $NODE_1 ]] ; then
- ./stop_cluster
+ $srcdir/stop_cluster
unset NODE_1
fi
}
diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test
index c7b59b62ef..13965f3a03 100755
--- a/cpp/src/tests/ssl_test
+++ b/cpp/src/tests/ssl_test
@@ -39,8 +39,7 @@ create_certs() {
}
start_broker() {
- ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
- PORT=`cat qpidd.port`
+ PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE`
}
stop_broker() {
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index 4f0516500c..48e299d942 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -32,7 +32,7 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
rm -f cluster*.log
SIZE=${1:-1}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@"
for (( i=0; i<SIZE; ++i )); do
PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1