summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-24 19:39:27 +0000
committerAlan Conway <aconway@apache.org>2007-07-24 19:39:27 +0000
commitb7c528b027bff7585481c9ce3a01144040c6de5a (patch)
tree6e4588e6b52a5a5457767ae9f8b59cddcfd28ef6 /cpp/src
parent0dcc71862cb48a79263a05facd4c42453441cbb5 (diff)
downloadqpid-python-b7c528b027bff7585481c9ce3a01144040c6de5a.tar.gz
* Summary:
- Wiring (declare/delete/bind) is replicated via AIS. - TestOptions includes all logging options. - Logger automatically parses env vars so logging can be enabled for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1. * src/qpid/cluster/SessionManager.cpp: Handle frames from cluster - Forward to BrokerAdapter for execution. - Suppress responses in proxy. * src/tests/TestOptions.h (Options): Logging options, --help option. * src/qpid/client/ClientConnection.cpp: Removed log initialization. Logs are initialized either in TestOptions or automatically from env vars, e.g. QPID_TRACE, * src/qpid/QpidError.h (class QpidError): Initialize Exception in constructor so messages can be logged. * src/qpid/framing/ChannelAdapter.h: Made send() virtual. * src/tests/Cluster_child.cpp: UUID corrected. * src/qpid/broker/Broker.cpp: Pass chains to updater by ref. * src/qpid/Options.cpp (parse): Fix log settings from environment. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/Options.cpp33
-rw-r--r--cpp/src/qpid/QpidError.cpp5
-rw-r--r--cpp/src/qpid/QpidError.h9
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp3
-rw-r--r--cpp/src/qpid/cluster/ClassifierHandler.cpp3
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp5
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp81
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h17
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h4
-rw-r--r--cpp/src/tests/Cluster.h4
-rw-r--r--cpp/src/tests/Cluster_child.cpp5
-rw-r--r--cpp/src/tests/TestOptions.h26
-rw-r--r--cpp/src/tests/cluster_client.cpp19
-rwxr-xr-xcpp/src/tests/start_cluster8
-rwxr-xr-xcpp/src/tests/stop_cluster4
16 files changed, 149 insertions, 79 deletions
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp
index 7f4536ff7b..2b6cff44f6 100644
--- a/cpp/src/qpid/Options.cpp
+++ b/cpp/src/qpid/Options.cpp
@@ -18,6 +18,9 @@
#include "Options.h"
#include "qpid/Exception.h"
+
+#include <boost/bind.hpp>
+
#include <fstream>
#include <algorithm>
#include <iostream>
@@ -28,18 +31,26 @@ using namespace std;
namespace {
-char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); }
+struct EnvOptMapper {
+ static bool matchChar(char env, char opt) {
+ return (env==toupper(opt)) || (strchr("-.", opt) && env=='_');
+ }
-struct Mapper {
- Mapper(const Options& o) : opts(o) {}
- string operator()(const string& env) {
+ static bool matchStr(const string& env, boost::shared_ptr<po::option_description> desc) {
+ return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar);
+ }
+
+ EnvOptMapper(const Options& o) : opts(o) {}
+
+ string operator()(const string& envVar) {
static const std::string prefix("QPID_");
- if (env.substr(0, prefix.size()) == prefix) {
- string opt = env.substr(prefix.size());
- transform(opt.begin(), opt.end(), opt.begin(), env2optchar);
- // Ignore env vars that don't match to known options.
- if (opts.find_nothrow(opt, false))
- return opt;
+ if (envVar.substr(0, prefix.size()) == prefix) {
+ string env = envVar.substr(prefix.size());
+ typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs;
+ OptDescs::const_iterator i =
+ find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1));
+ if (i != opts.options().end())
+ return (*i)->long_name();
}
return string();
}
@@ -62,7 +73,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile)
if (argc > 0 && argv != 0)
po::store(po::parse_command_line(argc, argv, *this), vm);
parsing="environment variables";
- po::store(po::parse_environment(*this, Mapper(*this)), vm);
+ po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm);
po::notify(vm); // configFile may be updated from arg/env options.
if (!configFile.empty()) {
parsing="configuration file "+configFile;
diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp
index fcd5af47a5..740ec24e54 100644
--- a/cpp/src/qpid/QpidError.cpp
+++ b/cpp/src/qpid/QpidError.cpp
@@ -34,9 +34,8 @@ Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_pt
void QpidError::throwSelf() const { throw *this; }
-void QpidError::init() {
- whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)")
- % code % msg % loc.file % loc.line);
+std::string QpidError::message(int code, const std::string& msg, const char* file, int line) {
+ return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str();
}
diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h
index dea0902a0e..2ff6571365 100644
--- a/cpp/src/qpid/QpidError.h
+++ b/cpp/src/qpid/QpidError.h
@@ -48,16 +48,15 @@ class QpidError : public Exception
template <class T>
QpidError(int code_, const T& msg_, const SrcLine& loc_) throw()
- : code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_))
- { init(); }
+ : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)),
+ code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {}
~QpidError() throw();
Exception::auto_ptr clone() const throw();
void throwSelf() const;
- private:
-
- void init();
+ /** Format message for exception. */
+ static std::string message(int code, const std::string& msg, const char* file, int line);
};
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 86342b3c43..26ec55ac44 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -164,7 +164,7 @@ void Broker::add(const shared_ptr<HandlerUpdater>& updater) {
void Broker::update(FrameHandler::Chains& chains) {
for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
- boost::bind(&HandlerUpdater::update, _1, chains));
+ boost::bind(&HandlerUpdater::update, _1, boost::ref(chains)));
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index 102de555fd..4b8f32a26f 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -51,9 +51,6 @@ Connection::Connection(
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
- qpid::log::Options o;
- o.trace = debug;
- qpid::log::Logger::instance().configure(o, "qpid-c++-client");
}
Connection::~Connection(){}
diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp
index 0d0465c89e..1cce126800 100644
--- a/cpp/src/qpid/cluster/ClassifierHandler.cpp
+++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp
@@ -61,6 +61,9 @@ void ClassifierHandler::handle(AMQFrame& frame) {
Chain chosen;
shared_ptr<AMQMethodBody> method =
dynamic_pointer_cast<AMQMethodBody>(frame.getBody());
+ // FIXME aconway 2007-07-05: Need to stop bypassed frames
+ // from overtaking mcast frames.
+ //
if (method)
chosen=map[fullId(method)];
if (chosen)
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 10b1c44f40..b00152cbcd 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -50,9 +50,10 @@ struct ClusterPlugin : public Plugin {
// Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- sessions.reset(new SessionManager());
+
+ sessions.reset(new SessionManager(*broker));
cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions));
- sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10:
+ sessions->setClusterSend(cluster);
broker->add(sessions);
}
}
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 24f201535d..9f6438cf92 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -16,17 +16,59 @@
*
*/
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
-#include "SessionManager.h"
-#include "ClassifierHandler.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/broker/BrokerAdapter.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/framing/ChannelAdapter.h"
namespace qpid {
namespace cluster {
using namespace framing;
using namespace sys;
+using namespace broker;
+
+/** Handler to send frames direct to local broker (bypass correlation etc.) */
+struct BrokerHandler : public FrameHandler, private ChannelAdapter {
+ Connection connection;
+ Channel channel;
+ BrokerAdapter adapter;
+
+ // TODO aconway 2007-07-23: Lots of needless flab here (Channel,
+ // Connection, ChannelAdapter) As these classes are untangled the
+ // flab can be reduced. The real requirements are:
+ // - Dispatch methods direct to broker bypassing all the correlation muck
+ // - Efficiently suppress responses
+ // For the latter we are now using a ChannelAdapter with noop send()
+ // A more efficient solution would be a no-op proxy.
+ //
+ BrokerHandler(Broker& broker) :
+ connection(0, broker),
+ channel(connection, 1, 0),
+ adapter(channel, connection, broker, *this) {}
+
+ void handle(AMQFrame& frame) {
+ AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.body.get());
+ assert(body);
+ body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext
+ }
+
+ // Dummy methods.
+ virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){}
+ virtual void handleContent(boost::shared_ptr<AMQContentBody>){}
+ virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){}
+ virtual bool isOpen() const{ return true; }
+ virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){}
+ // No-op send.
+ virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; }
+};
/** Wrap plain AMQFrames in SessionFrames */
struct FrameWrapperHandler : public FrameHandler {
@@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler {
SessionFrameHandler::Chain next;
};
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
-void SessionManager::update(FrameHandler::Chains& chains)
-{
+void SessionManager::update(FrameHandler::Chains& chains) {
Mutex::ScopedLock l(lock);
// Create a new local session, store local chains.
Uuid uuid(true);
sessions[uuid] = chains;
- // Replace local incoming chain. Build from the back.
- //
+ // Replace local in chain. Build from the back.
// TODO aconway 2007-07-05: Currently mcast wiring, bypass
// everythign else.
assert(clusterSend);
@@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains)
FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
chains.in = classify;
- // FIXME aconway 2007-07-05: Need to stop bypassed frames
- // from overtaking mcast frames.
- //
-
- // Leave outgoing chain unmodified.
+ // Leave out chain unmodified.
// TODO aconway 2007-07-05: Failover will require replication of
// outgoing frames to session replicas.
-
}
void SessionManager::handle(SessionFrame& frame) {
- // Incoming from frame.
- FrameHandler::Chains chains;
+ // Incoming from cluster.
{
Mutex::ScopedLock l(lock);
+ assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming?
SessionMap::iterator i = sessions.find(frame.uuid);
if (i == sessions.end()) {
- QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
- chains = nonLocal;
+ // Non local method frame, invoke.
+ localBroker->handle(frame.frame);
}
else {
- QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
- chains = i->second;
+ // Local frame, continue on local chain
+ i->second.in->handle(frame.frame);
}
}
- FrameHandler::Chain chain =
- chain = frame.isIncoming ? chains.in : chains.out;
- // TODO aconway 2007-07-11: Should this be assert(chain)
- if (chain)
- chain->handle(frame.frame);
-
- // TODO aconway 2007-07-05: Here's where we should unblock frame
- // dispatch for the channel.
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
index c23efde18e..77fc71733b 100644
--- a/cpp/src/qpid/cluster/SessionManager.h
+++ b/cpp/src/qpid/cluster/SessionManager.h
@@ -19,25 +19,33 @@
*
*/
-#include "qpid/broker/BrokerChannel.h"
#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include <boost/noncopyable.hpp>
+
#include <map>
namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
namespace cluster {
/**
* Manage sessions and handler chains for the cluster.
*
*/
-class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler
+class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler,
+ private boost::noncopyable
{
public:
- SessionManager();
+ SessionManager(broker::Broker& broker);
/** Set the handler to send to the cluster */
void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
@@ -52,12 +60,13 @@ class SessionManager : public framing::HandlerUpdater, public SessionFrameHandle
framing::ChannelId getChannelId(const framing::Uuid&) const;
private:
+ class SessionOperations;
typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
sys::Mutex lock;
SessionFrameHandler::Chain clusterSend;
+ framing::FrameHandler::Chain localBroker;
SessionMap sessions;
- framing::FrameHandler::Chains nonLocal;
};
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 50b1c9ff7e..a7c9c61640 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -75,8 +75,8 @@ class ChannelAdapter : protected BodyHandler {
*response to this frame. Ignored if body is not a Request.
*@return If body is a request, the ID assigned else 0.
*/
- RequestId send(shared_ptr<AMQBody> body,
- Correlator::Action action=Correlator::Action());
+ virtual RequestId send(shared_ptr<AMQBody> body,
+ Correlator::Action action=Correlator::Action());
virtual bool isOpen() const = 0;
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
index c2909e0c1b..e896fccafe 100644
--- a/cpp/src/tests/Cluster.h
+++ b/cpp/src/tests/Cluster.h
@@ -64,9 +64,9 @@ class TestHandler : public Handler<T&>, public vector<T>
Mutex::ScopedLock l(lock);
BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size());
AbsTime deadline(now(), 2*TIME_SEC);
- while (vector<T>::size() < n && lock.wait(deadline))
+ while (this->size() < n && lock.wait(deadline))
;
- return vector<T>::size() >= n;
+ return this->size() >= n;
}
};
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index 9c119e5238..c509dc1950 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -32,15 +32,14 @@ static const ProtocolVersion VER;
/** Chlid part of Cluster::clusterTwo test */
void clusterTwo() {
- TestCluster cluster("clusterTwo", "amqp::2");
+ TestCluster cluster("clusterTwo", "amqp:child:2");
BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent.
BOOST_CHECK(cluster.received[0].isIncoming);
BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody());
BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
AMQFrame frame(VER, 1, new ChannelOkBody(VER));
- Uuid id(true);
- SessionFrame sf(id, frame, false);
+ SessionFrame sf(cluster.received[0].uuid, frame, false);
cluster.handle(sf);
BOOST_REQUIRE(cluster.received.waitFor(2));
BOOST_CHECK(!cluster.received[1].isIncoming);
diff --git a/cpp/src/tests/TestOptions.h b/cpp/src/tests/TestOptions.h
index ee3af0873a..5b3c0958f5 100644
--- a/cpp/src/tests/TestOptions.h
+++ b/cpp/src/tests/TestOptions.h
@@ -22,13 +22,18 @@
*/
#include "qpid/Options.h"
+#include "qpid/log/Options.h"
#include "qpid/Url.h"
+#include "qpid/log/Logger.h"
+
+#include <iostream>
+#include <exception>
namespace qpid {
struct TestOptions : public qpid::Options
{
- TestOptions() : Options("Test Options"), host("localhost"), port(TcpAddress::DEFAULT_PORT), clientid("cpp"), trace(false), help(false)
+ TestOptions() : Options("Test Options"), host("localhost"), port(TcpAddress::DEFAULT_PORT), clientid("cpp"), help(false)
{
addOptions()
("host,h", optValue(host, "HOST"), "Broker host to connect to")
@@ -39,10 +44,26 @@ struct TestOptions : public qpid::Options
("clientname,n", optValue(clientid, "ID"), "unique client identifier")
("username", optValue(username, "USER"), "user name for broker log in.")
("password", optValue(password, "USER"), "password for broker log in.")
- ("trace,t", optValue(trace), "Turn on debug tracing.")
("help", optValue(help), "print this usage statement");
+ add(log);
}
+ /** As well as parsing, print help & exit if required */
+ void parse(int argc, char** argv) {
+ try {
+ qpid::Options::parse(argc, argv);
+ } catch (const std::exception& e) {
+ std::cout << e.what() << std::endl << *this << std::endl;
+ exit(1);
+ }
+ if (help) {
+ std::cout << *this << std::endl;
+ exit(0);
+ }
+ trace = log.trace;
+ qpid::log::Logger::instance().configure(log, argv[0]);
+ }
+
std::string host;
uint16_t port;
std::string virtualhost;
@@ -51,6 +72,7 @@ struct TestOptions : public qpid::Options
std::string password;
bool trace;
bool help;
+ log::Options log;
};
}
diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp
index 421a33a40a..7620faa9fa 100644
--- a/cpp/src/tests/cluster_client.cpp
+++ b/cpp/src/tests/cluster_client.cpp
@@ -52,6 +52,7 @@ struct ClusterConnections : public vector<shared_ptr<Connection> > {
};
BOOST_AUTO_TEST_CASE(testWiringReplication) {
+ // Declare on one broker, use on others.
ClusterConnections cluster;
BOOST_REQUIRE(cluster.size() > 1);
@@ -63,13 +64,17 @@ BOOST_AUTO_TEST_CASE(testWiringReplication) {
broker0.declareExchange(fooEx);
broker0.declareQueue(fooQ);
broker0.bind(fooEx, fooQ, "FooKey");
-
- Channel broker1;
- cluster[1]->openChannel(broker1);
- broker1.publish(Message("hello"), fooEx, "FooKey");
- Message m;
- BOOST_REQUIRE(broker1.get(m, fooQ));
- BOOST_REQUIRE_EQUAL(m.getData(), "hello");
+ broker0.close();
+
+ for (size_t i = 1; i < cluster.size(); ++i) {
+ Channel ch;
+ cluster[i]->openChannel(ch);
+ ch.publish(Message("hello"), fooEx, "FooKey");
+ Message m;
+ BOOST_REQUIRE(ch.get(m, fooQ));
+ BOOST_REQUIRE_EQUAL(m.getData(), "hello");
+ ch.close();
+ }
}
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index c2806bb225..8f44854978 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -6,14 +6,14 @@
test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; }
+rm -f cluster*.log cluster.ports
SIZE=$1
shift
OPTS=$*
-
+CLUSTER=`whoami` # Cluster name=user name, avoid clashes.
for (( i=0; i<SIZE; ++i )); do
- PORT=`../qpidd -dp0 --log.output=broker$i.log $OPTS` || exit 1
- PORTS="$PORT $PORTS"
+ PORT=`../qpidd -dp0 --log.output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1
+ echo $PORT >> cluster.ports
done
-echo $PORTS > cluster.ports
diff --git a/cpp/src/tests/stop_cluster b/cpp/src/tests/stop_cluster
index f5db5a4488..6afcb527e5 100755
--- a/cpp/src/tests/stop_cluster
+++ b/cpp/src/tests/stop_cluster
@@ -6,11 +6,9 @@ PORTS=`cat cluster.ports`
for PORT in $PORTS ; do
../qpidd -qp $PORT || ERROR="$ERROR $PORT"
done
+rm -f cluster.ports
if [ -n "$ERROR" ]; then
echo "Errors stopping brokers on ports: $ERROR"
- echo $ERROR > cluster.ports
exit 1
-else
- rm cluster.ports
fi