diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/cluster.mk | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 21 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/EventFrame.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/McastFrameHandler.h | 46 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/types.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h | 2 | ||||
-rwxr-xr-x | cpp/src/tests/federated_cluster_test | 4 | ||||
-rwxr-xr-x | cpp/src/tests/ssl_test | 3 | ||||
-rwxr-xr-x | cpp/src/tests/start_cluster | 2 |
20 files changed, 163 insertions, 31 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 0d34788d2e..55355d2077 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -70,6 +70,7 @@ cluster_la_SOURCES = \ qpid/cluster/FailoverExchange.h \ qpid/cluster/Multicaster.cpp \ qpid/cluster/Multicaster.h \ + qpid/cluster/MulticastFrameHandler.h \ qpid/cluster/NoOpConnectionOutputHandler.h \ qpid/cluster/OutputInterceptor.cpp \ qpid/cluster/OutputInterceptor.h \ diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 0e9d211b56..0d7fbc5b3b 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -48,8 +48,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable heartbeatmax(120), stagingThreshold(broker.getStagingThreshold()), federationLink(true), - clientSupportsThrottling(false) - {} + clientSupportsThrottling(false), + clusterOrderOut(0) + {} virtual ~ConnectionState () {} @@ -75,7 +76,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable const string& getFederationPeerTag() const { return federationPeerTag; } std::vector<Url>& getKnownHosts() { return knownHosts; } - void setClientThrottling() { clientSupportsThrottling = true; } + void setClientThrottling(bool set=true) { clientSupportsThrottling = set; } bool getClientThrottling() const { return clientSupportsThrottling; } Broker& getBroker() { return broker; } @@ -86,11 +87,20 @@ class ConnectionState : public ConnectionToken, public management::Manageable //contained output tasks sys::AggregateOutput outputTasks; - sys::ConnectionOutputHandlerPtr& getOutput() { return out; } + sys::ConnectionOutputHandler& getOutput() { return out; } framing::ProtocolVersion getVersion() const { return version; } - void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); } + /** + * If the broker is part of a cluster, this is a handler provided + * by cluster code. It ensures consistent ordering of commands + * that are sent based on criteria that are not predictably + * ordered cluster-wide, e.g. a timer firing. + */ + framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; } + void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; } + + protected: framing::ProtocolVersion version; uint32_t framemax; @@ -103,6 +113,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable string federationPeerTag; std::vector<Url> knownHosts; bool clientSupportsThrottling; + framing::FrameHandler* clusterOrderOut; }; }} diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 2c4de478f6..5bdc1e2500 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -34,7 +34,8 @@ using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : amqp_0_10::SessionHandler(&c.getOutput(), ch), connection(c), - proxy(out) + proxy(out), + clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) {} SessionHandler::~SessionHandler() {} diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 7449db1560..698e4f397f 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -54,6 +54,17 @@ class SessionHandler : public amqp_0_10::SessionHandler { framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } + /** + * If commands are sent based on the local time (e.g. in timers), they don't have + * a well-defined ordering across cluster nodes. + * This proxy is for sending such commands. In a clustered broker it will take steps + * to synchronize command order across the cluster. In a stand-alone broker + * it is just a synonym for getProxy() + */ + framing::AMQP_ClientProxy& getClusterOrderProxy() { + return clusterOrderProxy.get() ? *clusterOrderProxy : proxy; + } + virtual void handleDetach(); // Overrides @@ -69,9 +80,16 @@ class SessionHandler : public amqp_0_10::SessionHandler { virtual void readyToSend(); private: + struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel. + framing::ChannelHandler setChannel; + SetChannelProxy(uint16_t ch, framing::FrameHandler* out) + : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {} + }; + Connection& connection; framing::AMQP_ClientProxy proxy; std::auto_ptr<SessionState> session; + std::auto_ptr<SetChannelProxy> clusterOrderProxy; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index dffc7cf6af..b64fc20787 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -66,7 +66,7 @@ SessionState::SessionState( uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { if (handler->getConnection().getClientThrottling()) { - rateFlowcontrol = new RateFlowcontrol(maxRate); + rateFlowcontrol.reset(new RateFlowcontrol(maxRate)); } else { QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); } @@ -210,7 +210,6 @@ struct ScheduledCreditTask : public TimerTask { {} void fire() { - QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE // This is the best we can currently do to avoid a destruction/fire race if (!isCancelled()) { if ( !sessionState.processSendCredit(0) ) { @@ -275,7 +274,8 @@ bool SessionState::processSendCredit(uint32_t msgs) if ( msgs > 0 && rateFlowcontrol->flowStopped() ) { QPID_LOG(warning, getId() << ": producer throttling violation"); // TODO: Probably do message.stop("") first time then disconnect - getProxy().getMessage().stop(""); + // See comment on getClusterOrderProxy() in .h file + getClusterOrderProxy().getMessage().stop(""); return true; } AbsTime now = AbsTime::now(); @@ -283,7 +283,7 @@ bool SessionState::processSendCredit(uint32_t msgs) if (mgmtObject) mgmtObject->dec_clientCredit(msgs); if ( sendCredit>0 ) { QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); - getProxy().getMessage().flow("", 0, sendCredit); + getClusterOrderProxy().getMessage().flow("", 0, sendCredit); rateFlowcontrol->sentCredit(now, sendCredit); if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit); return true; @@ -364,8 +364,9 @@ void SessionState::readyToSend() { // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U); QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit); - getProxy().getMessage().setFlowMode("", 0); - getProxy().getMessage().flow("", 0, credit); + // See comment on getClusterOrderProxy() in .h file + getClusterOrderProxy().getMessage().setFlowMode("", 0); + getClusterOrderProxy().getMessage().flow("", 0, credit); rateFlowcontrol->sentCredit(AbsTime::now(), credit); if (mgmtObject) mgmtObject->inc_clientCredit(credit); } @@ -373,4 +374,8 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } +framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { + return handler->getClusterOrderProxy(); +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index c435a741f8..b64461eb86 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -125,6 +125,15 @@ class SessionState : public qpid::SessionState, void sendAcceptAndCompletion(); + /** + * If commands are sent based on the local time (e.g. in timers), they don't have + * a well-defined ordering across cluster nodes. + * This proxy is for sending such commands. In a clustered broker it will take steps + * to synchronize command order across the cluster. In a stand-alone broker + * it is just a synonym for getProxy() + */ + framing::AMQP_ClientProxy& getClusterOrderProxy(); + Broker& broker; SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. @@ -138,7 +147,7 @@ class SessionState : public qpid::SessionState, // State used for producer flow control (rate limited) qpid::sys::Mutex rateLock; - RateFlowcontrol* rateFlowcontrol; + boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; boost::intrusive_ptr<TimerTask> flowControlTimer; friend class SessionManager; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 1a3f7c4ef7..9c2b4f1638 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -62,7 +62,8 @@ NoOpConnectionOutputHandler Connection::discardHandler; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, ConnectionId myId) : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false) + connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false), + mcastFrameHandler(cluster.getMulticast(), self) { init(); } // Local connections @@ -70,15 +71,20 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink) + expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) { init(); } void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocalClient()) { + if (isLocalClient()) { + connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node cluster.addLocalConnection(this); giveReadCredit(cluster.getReadMax()); } + else { // Shadow or catch-up connection + connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames + connection.setClientThrottling(false); // Disable client throttling, done by active node. + } } void Connection::giveReadCredit(int credit) { @@ -143,7 +149,15 @@ void Connection::deliveredFrame(const EventFrame& f) { if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection. + // FIXME aconway 2009-02-24: Using the DATA/CONTROL + // distinction to distinguish incoming vs. outgoing frames is + // very unclear. + if (f.type == DATA) // incoming data frames to broker::Connection + connection.received(const_cast<AMQFrame&>(f.frame)); + else { // outgoing data frame, send via SessionState + broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); + } } giveReadCredit(f.readCredit); } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 98b47e1bc0..cefea00262 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -27,6 +27,7 @@ #include "OutputInterceptor.h" #include "NoOpConnectionOutputHandler.h" #include "EventFrame.h" +#include "McastFrameHandler.h" #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" @@ -150,6 +151,10 @@ class Connection : void giveReadCredit(int credit); private: + struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} + }; + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); @@ -174,6 +179,8 @@ class Connection : framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; + McastFrameHandler mcastFrameHandler; + NullFrameHandler nullFrameHandler; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index e30b961b3e..9fe5376bc5 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -74,14 +74,17 @@ Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { return e; } -Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { - framing::AMQFrame f(body); +Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) { Event e(CONTROL, cid, f.encodedSize()); Buffer buf(e); f.encode(buf); return e; } +Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { + return control(framing::AMQFrame(body), cid); +} + iovec Event::toIovec() { encodeHeader(); iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; @@ -110,10 +113,13 @@ Event::operator Buffer() const { static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; +std::ostream& operator << (std::ostream& o, EventType t) { + return o << EVENT_TYPE_NAMES[t]; +} + std::ostream& operator << (std::ostream& o, const EventHeader& e) { o << "[event " << e.getConnectionId() << "/" << e.getSequence() - << " " << EVENT_TYPE_NAMES[e.getType()] - << " " << e.getSize() << " bytes]"; + << " " << e.getType() << " " << e.getSize() << " bytes]"; return o; } diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index f1de248f89..1338ea7413 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -34,6 +34,7 @@ namespace qpid { namespace framing { class AMQBody; +class AMQFrame; class Buffer; } @@ -83,8 +84,11 @@ class Event : public EventHeader { /** Create an event copied from delivered data. */ static Event decodeCopy(const MemberId& m, framing::Buffer&); - /** Create an event containing a control */ + /** Create a control event. */ static Event control(const framing::AMQBody&, const ConnectionId&); + + /** Create a control event. */ + static Event control(const framing::AMQFrame&, const ConnectionId&); // Data excluding header. char* getData() { return store + HEADER_SIZE; } @@ -105,6 +109,7 @@ class Event : public EventHeader { }; std::ostream& operator << (std::ostream&, const EventHeader&); + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_EVENT_H*/ diff --git a/cpp/src/qpid/cluster/EventFrame.cpp b/cpp/src/qpid/cluster/EventFrame.cpp index ba01c170dd..48c9eab958 100644 --- a/cpp/src/qpid/cluster/EventFrame.cpp +++ b/cpp/src/qpid/cluster/EventFrame.cpp @@ -27,13 +27,13 @@ namespace cluster { EventFrame::EventFrame() : sequence(0) {} EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) - : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc) + : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc), type(e.getType()) { QPID_LATENCY_INIT(frame); } std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit; + return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit << " type=" << e.type; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index 7f33cedb5b..ef3c38658b 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -57,7 +57,8 @@ struct EventFrame ConnectionId connectionId; framing::AMQFrame frame; uint64_t sequence; - int readCredit; // last frame in an event, give credit when processed. + int readCredit; ///< last frame in an event, give credit when processed. + EventType type; }; std::ostream& operator<<(std::ostream& o, const EventFrame& e); diff --git a/cpp/src/qpid/cluster/McastFrameHandler.h b/cpp/src/qpid/cluster/McastFrameHandler.h new file mode 100644 index 0000000000..5127c31c84 --- /dev/null +++ b/cpp/src/qpid/cluster/McastFrameHandler.h @@ -0,0 +1,46 @@ +#ifndef QPID_CLUSTER_MCASTFRAMEHANDLER_H +#define QPID_CLUSTER_MCASTFRAMEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Multicaster.h" +#include "qpid/framing/FrameHandler.h" + +namespace qpid { +namespace cluster { + +/** + * A frame handler that multicasts frames as CONTROL events. + */ +class McastFrameHandler : public framing::FrameHandler +{ + public: + McastFrameHandler(Multicaster& m, const ConnectionId& cid) : mcast(m), connection(cid) {} + void handle(framing::AMQFrame& frame) { mcast.mcastControl(frame, connection); } + private: + Multicaster& mcast; + ConnectionId connection; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_MCASTFRAMEHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 239b3f5f35..f0738ab08f 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" #include "qpid/framing/AMQBody.h" +#include "qpid/framing/AMQFrame.h" namespace qpid { namespace cluster { @@ -43,6 +44,11 @@ void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& mcast(Event::control(body, id)); } +void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) { + QPID_LOG(trace, "MCAST " << id << ": " << frame); + mcast(Event::control(frame, id)); +} + void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { Event e(DATA, id, size); memcpy(e.getData(), data, size); diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index 1dfee47bd5..d1c3115977 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -50,6 +50,7 @@ class Multicaster boost::function<void()> onError ); void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&); + void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&); void mcastBuffer(const char*, size_t, const ConnectionId&); void mcast(const Event& e); /** End holding mode, held events are mcast */ diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index d1d6fdc427..30454d9fbb 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -78,6 +78,8 @@ struct ConnectionId : public std::pair<MemberId, Connection*> { std::ostream& operator<<(std::ostream&, const ConnectionId&); +std::ostream& operator << (std::ostream&, EventType); + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_TYPES_H*/ diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index df6de89982..32809d86a1 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -30,7 +30,7 @@ namespace sys { /** * A ConnectionOutputHandler that delegates to another * ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler - * to be changed modified without updating all the pointers/references + * to be changed without updating all the pointers/references * using the ConnectionOutputHandlerPtr */ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler diff --git a/cpp/src/tests/federated_cluster_test b/cpp/src/tests/federated_cluster_test index ef0db4cb8e..8c2117fb8c 100755 --- a/cpp/src/tests/federated_cluster_test +++ b/cpp/src/tests/federated_cluster_test @@ -22,7 +22,7 @@ # Test reliability of the replication feature in the face of link # failures: srcdir=`dirname $0` -PYTHON_DIR=${srcdir}/../../../python +PYTHON_DIR=$srcdir/../../../python trap stop_brokers EXIT @@ -37,7 +37,7 @@ stop_brokers() { unset BROKER_A fi if [[ $NODE_1 ]] ; then - ./stop_cluster + $srcdir/stop_cluster unset NODE_1 fi } diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test index c7b59b62ef..13965f3a03 100755 --- a/cpp/src/tests/ssl_test +++ b/cpp/src/tests/ssl_test @@ -39,8 +39,7 @@ create_certs() { } start_broker() { - ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port - PORT=`cat qpidd.port` + PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE` } stop_broker() { diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster index 4f0516500c..48e299d942 100755 --- a/cpp/src/tests/start_cluster +++ b/cpp/src/tests/start_cluster @@ -32,7 +32,7 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } rm -f cluster*.log SIZE=${1:-1}; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. -OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*" +OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@" for (( i=0; i<SIZE; ++i )); do PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1 |