summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-02-16 21:38:38 +0000
committerAlan Conway <aconway@apache.org>2011-02-16 21:38:38 +0000
commitfca7b2ac23c76c402457ab605639ba4b16a5e3f1 (patch)
treedeb9cfb8003de62906bd4577311cdb8466619202
parent9d1210ea9582fa8863fe788428751fca8e2f64ad (diff)
downloadqpid-python-fca7b2ac23c76c402457ab605639ba4b16a5e3f1.tar.gz
Merge branch 'set-in-cluster-early-init' into qpid-2935
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071410 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp24
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h47
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp17
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp70
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.h16
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.h6
-rw-r--r--qpid/cpp/src/qpid/sys/ClusterSafe.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/ClusterSafe.h3
10 files changed, 97 insertions, 95 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 2370795d0d..2ef6933612 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -60,7 +60,6 @@
#include "qpid/StringUtils.h"
#include "qpid/Url.h"
#include "qpid/Version.h"
-#include "qpid/sys/ClusterSafe.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -171,8 +170,9 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayHardLimit*1024),
*this),
queueCleaner(queues, timer),
- queueEvents(poller,!conf.asyncQueueEvents),
+ queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
+ inCluster(false),
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
@@ -230,8 +230,8 @@ Broker::Broker(const Broker::Options& conf) :
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
- /** todo KAG - remove once cluster support for flow control done + (and ClusterSafe.h include above). */
- if (sys::isCluster()) {
+ /** todo KAG - remove once cluster support for flow control done */
+ if (isInCluster()) {
QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
QueueFlowLimit::setDefaults(0, 0, 0);
} else {
@@ -239,7 +239,7 @@ Broker::Broker(const Broker::Options& conf) :
}
// If no plugin store module registered itself, set up the null store.
- if (NullMessageStore::isNullStore(store.get()))
+ if (NullMessageStore::isNullStore(store.get()))
setStore();
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -358,14 +358,14 @@ void Broker::run() {
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
std::vector<Thread> t(numIOThreads-1);
-
+
// Run n-1 io threads
for (int i=0; i<numIOThreads-1; ++i)
t[i] = Thread(d);
-
+
// Run final thread
d.run();
-
+
// Now wait for n-1 io threads to exit
for (int i=0; i<numIOThreads-1; ++i) {
t[i].join();
@@ -412,9 +412,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
case _qmf::Broker::METHOD_ECHO :
QPID_LOG (debug, "Broker::echo("
- << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
- << ", "
- << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+ << ", "
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
<< ")");
status = Manageable::STATUS_OK;
break;
@@ -479,7 +479,7 @@ std::string Broker::getLogLevel()
const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
if (i != selectors.begin()) level += std::string(",");
- level += *i;
+ level += *i;
}
return level;
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 263cee51f5..55823bc45a 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -57,7 +57,7 @@
#include <string>
#include <vector>
-namespace qpid {
+namespace qpid {
namespace sys {
class ProtocolFactory;
@@ -80,7 +80,7 @@ struct NoSuchTransportException : qpid::Exception
};
/**
- * A broker instance.
+ * A broker instance.
*/
class Broker : public sys::Runnable, public Plugin::Target,
public management::Manageable,
@@ -121,25 +121,25 @@ public:
private:
std::string getHome();
};
-
+
class ConnectionCounter {
int maxConnections;
int connectionCount;
sys::Mutex connectionCountLock;
public:
ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
- void inc_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ void inc_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
connectionCount++;
- }
- void dec_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ }
+ void dec_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
connectionCount--;
}
bool allowConnection() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
return (maxConnections <= connectionCount);
- }
+ }
};
private:
@@ -177,10 +177,10 @@ public:
const boost::intrusive_ptr<Message>& msg);
std::string federationTag;
bool recovery;
- bool clusterUpdatee;
+ bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
-
+
public:
virtual ~Broker();
@@ -236,7 +236,7 @@ public:
QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, uint16_t port,
+ void connect(const std::string& host, uint16_t port,
const std::string& transport,
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* =0);
@@ -248,9 +248,9 @@ public:
/** Move messages from one queue to another.
A zero quantity means to move all messages
*/
- uint32_t queueMoveMessages( const std::string& srcQueue,
+ uint32_t queueMoveMessages( const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty);
+ uint32_t qty);
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
@@ -274,11 +274,20 @@ public:
void setRecovery(bool set) { recovery = set; }
bool getRecovery() const { return recovery; }
- void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ /** True of this broker is part of a cluster.
+ * Only valid after early initialization of plugins is complete.
+ */
+ bool isInCluster() const { return inCluster; }
+ void setInCluster(bool set) { inCluster = set; }
+
+ /** True if this broker is joining a cluster and in the process of
+ * receiving a state update.
+ */
bool isClusterUpdatee() const { return clusterUpdatee; }
+ void setClusterUpdatee(bool set) { clusterUpdatee = set; }
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
-
+
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
/**
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 460799280e..67713a6eb7 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -278,8 +278,7 @@ void Connection::setUserId(const string& userId)
ConnectionState::setUserId(userId);
// In a cluster, the cluster code will raise the connect event
// when the connection is replicated to the cluster.
- if (!sys::isCluster())
- raiseConnectEvent();
+ if (!broker.isInCluster()) raiseConnectEvent();
}
void Connection::raiseConnectEvent() {
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index e1091df724..f3acf7c660 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -134,7 +134,7 @@ void Link::established ()
QPID_LOG (info, "Inter-broker link established to " << addr.str());
// Don't raise the management event in a cluster, other members wont't get this call.
- if (!sys::isCluster())
+ if (broker && !broker->isInCluster())
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
{
@@ -159,7 +159,7 @@ void Link::closed (int, std::string text)
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- if (!sys::isCluster())
+ if (broker && !broker->isInCluster())
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 7e8739fe9e..e7ad74b8ab 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -320,11 +320,10 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue,
return std::auto_ptr<QueueFlowLimit>();
}
/** todo KAG - remove once cluster support for flow control done. */
- if (sys::isCluster()) {
- if (queue) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- }
+ // TODO aconway 2011-02-16: is queue==0 only in tests?
+ if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
+ QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+ << queue->getName());
return std::auto_ptr<QueueFlowLimit>();
}
return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
@@ -337,11 +336,9 @@ std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue,
uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
/** todo KAG - remove once cluster support for flow control done. */
- if (sys::isCluster()) {
- if (queue) {
- QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
- << queue->getName());
- }
+ if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
+ QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+ << queue->getName());
return std::auto_ptr<QueueFlowLimit>();
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index dd4882774b..fe5a1c806e 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,28 +36,28 @@
*
* IMPORTANT NOTE: any time code is added to the broker that uses timers,
* the cluster may need to be updated to take account of this.
- *
+ *
*
* USE OF TIMESTAMPS IN THE BROKER
- *
+ *
* The following are the current areas where broker uses timers or timestamps:
- *
+ *
* - Producer flow control: broker::SemanticState uses
* connection::getClusterOrderOutput. a FrameHandler that sends
* frames to the client via the cluster. Used by broker::SessionState
- *
+ *
* - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
* implemented by cluster::ExpiryPolicy.
- *
+ *
* - Connection heartbeat: sends connection controls, not part of
* session command counting so OK to ignore.
- *
+ *
* - LinkRegistry: only cluster elder is ever active for links.
- *
+ *
* - management::ManagementBroker: uses MessageHandler supplied by cluster
* to send messages to the broker via the cluster.
- *
- * - Dtx: not yet supported with cluster.
+ *
+ * - Dtx: not yet supported with cluster.
*
* cluster::ExpiryPolicy implements the strategy for message expiry.
*
@@ -65,16 +65,16 @@
* Used for periodic management events.
*
* <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- *
+ *
* Messages sent to/from CPG are called Events.
*
* An Event carries a ConnectionId, which includes a MemberId and a
* connection number.
- *
+ *
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
* - Cluster Events: 0 connection number, are not associated with a connection.
- *
+ *
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
* - Data: carries raw data received from a client connection.
@@ -214,7 +214,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId,
+ framing::cluster::StoreState(storeState), shutdownId,
firstConfig, l);
}
void ready(const std::string& url) {
@@ -244,7 +244,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
};
Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
- settings(set),
+ settings(set),
broker(b),
mgmtObject(0),
poller(b.getPoller()),
@@ -279,6 +279,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
updateClosed(false),
error(*this)
{
+ broker.setInCluster(true);
+
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
timer = new ClusterTimer(*this);
@@ -299,7 +301,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- clusterId = store.getClusterId();
+ clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
@@ -360,14 +362,14 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
- assert(discarding);
+ assert(discarding);
pair<ConnectionMap::iterator, bool> ib
= connections.insert(ConnectionMap::value_type(c->getId(), c));
assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
- Lock l(lock);
+ Lock l(lock);
erase(id,l);
}
@@ -393,9 +395,9 @@ std::vector<Url> Cluster::getUrls() const {
std::vector<Url> Cluster::getUrls(Lock&) const {
return map.memberUrls();
-}
+}
-void Cluster::leave() {
+void Cluster::leave() {
Lock l(lock);
leave(l);
}
@@ -405,7 +407,7 @@ void Cluster::leave() {
QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
} do {} while(0)
-void Cluster::leave(Lock&) {
+void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -424,7 +426,7 @@ void Cluster::deliver(
uint32_t nodeid,
uint32_t pid,
void* msg,
- int msg_len)
+ int msg_len)
{
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -455,7 +457,7 @@ void Cluster::deliveredEvent(const Event& e) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- // Only do this for the two brokers that are directly involved in this
+ // Only do this for the two brokers that are directly involved in this
// offer: the one making the offer, or the one receiving it.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -465,7 +467,7 @@ void Cluster::deliveredEvent(const Event& e) {
}
deliverFrame(ef);
}
- else if(!discarding) {
+ else if(!discarding) {
if (e.isControl())
deliverFrame(EventFrame(e, e.getFrame()));
else {
@@ -507,7 +509,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
// the event queue.
e.frame = AMQFrame(
ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
- deliverEventQueue.start();
+ deliverEventQueue.start();
}
// Process each frame through the error checker.
if (error.isUnresolved()) {
@@ -515,7 +517,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
}
- else
+ else
processFrame(e, l);
}
@@ -577,7 +579,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
}
// CPG config-change callback.
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
const cpg_address *members, int nMembers,
@@ -607,7 +609,7 @@ void Cluster::setReady(Lock&) {
}
// Set the management status from the Cluster::state.
-//
+//
// NOTE: Management updates are sent based on property changes. In
// order to keep consistency across the cluster, we touch the local
// management status property even if it is locally unchanged for any
@@ -618,7 +620,7 @@ void Cluster::setMgmtStatus(Lock&) {
}
void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
+ // Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
setMgmtStatus(l);
if (state == PRE_INIT) {
@@ -701,8 +703,8 @@ void Cluster::configChange(const MemberId&,
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
- ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(),
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getShutdownId(),
initMap.getFirstConfigStr()
),
self);
@@ -759,7 +761,7 @@ std::string Cluster::debugSnapshot() {
// point we know the poller has stopped so no poller callbacks will be
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
-//
+//
void Cluster::brokerShutdown() {
sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
try { cpg.shutdown(); }
@@ -775,7 +777,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
}
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
- const framing::Uuid& id,
+ const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
@@ -969,7 +971,7 @@ void Cluster::updateOutDone(Lock& l) {
void Cluster::updateOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending update: " << e.what());
+ QPID_LOG(error, *this << " error sending update: " << e.what());
updateOutDone(l);
}
@@ -1067,7 +1069,7 @@ void Cluster::memberUpdate(Lock& l) {
void Cluster::updateMgmtMembership(Lock& l) {
if (!mgmtObject) return;
std::vector<Url> urls = getUrls(l);
- mgmtObject->set_clusterSize(urls.size());
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
if (i != urls.begin()) urlstr += ";";
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index 07f74d3ba5..c2dca073d1 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -53,7 +53,7 @@ class EventHeader {
/** Size of payload data, excluding header. */
size_t getSize() const { return size; }
- /** Size of header + payload. */
+ /** Size of header + payload. */
size_t getStoreSize() const { return size + HEADER_SIZE; }
bool isCluster() const { return connectionId.getNumber() == 0; }
@@ -62,7 +62,7 @@ class EventHeader {
protected:
static const size_t HEADER_SIZE;
-
+
EventType type;
ConnectionId connectionId;
size_t size;
@@ -86,7 +86,7 @@ class Event : public EventHeader {
/** Create a control event. */
static Event control(const framing::AMQFrame&, const ConnectionId&);
-
+
// Data excluding header.
char* getData() { return store + HEADER_SIZE; }
const char* getData() const { return store + HEADER_SIZE; }
@@ -95,12 +95,12 @@ class Event : public EventHeader {
char* getStore() { return store; }
const char* getStore() const { return store; }
- const framing::AMQFrame& getFrame() const;
-
+ const framing::AMQFrame& getFrame() const;
+
operator framing::Buffer() const;
iovec toIovec() const;
-
+
private:
void encodeHeader() const;
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h
index 61447c5525..6b702a9bf8 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.h
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ struct EventFrame
ConnectionId connectionId;
- framing::AMQFrame frame;
+ framing::AMQFrame frame;
int readCredit; ///< last frame in an event, give credit when processed.
EventType type;
};
diff --git a/qpid/cpp/src/qpid/sys/ClusterSafe.cpp b/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
index c6b527dfdf..b67b04c267 100644
--- a/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
+++ b/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
@@ -34,8 +34,6 @@ QPID_TSS bool inContext = false;
bool isClusterSafe() { return !inCluster || inContext; }
-bool isCluster() { return inCluster; }
-
void assertClusterSafe() {
if (!isClusterSafe()) {
QPID_LOG(critical, "Modified cluster state outside of cluster context");
diff --git a/qpid/cpp/src/qpid/sys/ClusterSafe.h b/qpid/cpp/src/qpid/sys/ClusterSafe.h
index 15675e8cc5..42e290f4c8 100644
--- a/qpid/cpp/src/qpid/sys/ClusterSafe.h
+++ b/qpid/cpp/src/qpid/sys/ClusterSafe.h
@@ -52,9 +52,6 @@ QPID_COMMON_EXTERN void assertClusterSafe();
*/
QPID_COMMON_EXTERN bool isClusterSafe();
-/** Return true in a clustered broker */
-QPID_COMMON_EXTERN bool isCluster();
-
/**
* Base class for classes that encapsulate state which is replicated
* to all members of a cluster. Acts as a marker for clustered state