summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-07 17:27:06 +0000
committerAlan Conway <aconway@apache.org>2008-10-07 17:27:06 +0000
commit41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (patch)
treede5e5b5e431bf695b2c44e198ee93d179201a0e2 /cpp/src/qpid
parenta653ebe5bdfad1d44a576d2ab23f7e6ea80ba96f (diff)
downloadqpid-python-41d33af55b9fbf4c664ccb56accb1a37bd1ef006.tar.gz
broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients. client: added FailoverListener - receive cluster updates from failover exchange. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Deliverable.h2
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp4
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h4
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp4
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h5
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp6
-rw-r--r--cpp/src/qpid/broker/TxPublish.h6
-rw-r--r--cpp/src/qpid/client/FailoverListener.cpp59
-rw-r--r--cpp/src/qpid/client/FailoverListener.h58
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp36
-rw-r--r--cpp/src/qpid/cluster/Cluster.h15
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp10
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h10
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp7
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.cpp99
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.h68
-rw-r--r--cpp/src/qpid/framing/Array.cpp2
-rw-r--r--cpp/src/qpid/framing/Array.h6
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp2
20 files changed, 362 insertions, 43 deletions
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h
index c40780c4ae..e0ceeb2408 100644
--- a/cpp/src/qpid/broker/Deliverable.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -33,7 +33,7 @@ namespace qpid {
virtual Message& getMessage() = 0;
- virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual void deliverTo(const boost::shared_ptr<Queue>& queue) = 0;
virtual uint64_t contentSize() { return 0; }
virtual ~Deliverable(){}
};
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index fd15acf464..5fff54b329 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -22,11 +22,11 @@
using namespace qpid::broker;
-DeliverableMessage::DeliverableMessage(boost::intrusive_ptr<Message>& _msg) : msg(_msg)
+DeliverableMessage::DeliverableMessage(const boost::intrusive_ptr<Message>& _msg) : msg(_msg)
{
}
-void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
+void DeliverableMessage::deliverTo(const boost::shared_ptr<Queue>& queue)
{
queue->deliver(msg);
delivered = true;
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index 18e1ec5e29..f5db473c22 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -32,8 +32,8 @@ namespace qpid {
class DeliverableMessage : public Deliverable{
boost::intrusive_ptr<Message> msg;
public:
- DeliverableMessage(boost::intrusive_ptr<Message>& msg);
- virtual void deliverTo(Queue::shared_ptr& queue);
+ DeliverableMessage(const boost::intrusive_ptr<Message>& msg);
+ virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
Message& getMessage();
uint64_t contentSize();
virtual ~DeliverableMessage(){}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 45eb308680..309e88e8be 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -97,6 +97,10 @@ Exchange::shared_ptr ExchangeRegistry::get(const string& name){
return i->second;
}
+bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) {
+ return exchanges.insert(ExchangeMap::value_type(ex->getName(), ex)).second;
+}
+
void ExchangeRegistry::registerType(const std::string& type, FactoryFunction f)
{
factory[type] = f;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index 58cbca3d92..787b7896f0 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -59,6 +59,11 @@ class ExchangeRegistry{
*/
void setParent (management::Manageable* _parent) { parent = _parent; }
+ /** Register an exchange instance.
+ *@return true if registered, false if exchange with same name is already registered.
+ */
+ bool registerExchange(const Exchange::shared_ptr&);
+
void registerType(const std::string& type, FactoryFunction);
/** Call f for each exchange in the registry. */
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index b058978ccf..4103795087 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -59,7 +59,7 @@ class RecoverableQueueImpl : public RecoverableQueue
{
Queue::shared_ptr queue;
public:
- RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
+ RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
~RecoverableQueueImpl() {};
void setPersistenceId(uint64_t id);
uint64_t getPersistenceId() const;
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index dcee00e803..25ac691ada 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -45,7 +45,7 @@ void TxPublish::commit() throw(){
void TxPublish::rollback() throw(){
}
-void TxPublish::deliverTo(Queue::shared_ptr& queue){
+void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
if (!queue->isLocal(msg)) {
queues.push_back(queue);
delivered = true;
@@ -57,7 +57,7 @@ void TxPublish::deliverTo(Queue::shared_ptr& queue){
TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg)
: ctxt(_ctxt), msg(_msg){}
-void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){
if (!queue->enqueue(ctxt, msg)){
/**
* if not store then mark message for ack and deleivery once
@@ -70,7 +70,7 @@ void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
-void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
queue->process(msg);
}
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index d2590debfb..018437f1ed 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -51,14 +51,14 @@ namespace qpid {
boost::intrusive_ptr<Message>& msg;
public:
Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg);
- void operator()(Queue::shared_ptr& queue);
+ void operator()(const boost::shared_ptr<Queue>& queue);
};
class Commit{
boost::intrusive_ptr<Message>& msg;
public:
Commit(boost::intrusive_ptr<Message>& msg);
- void operator()(Queue::shared_ptr& queue);
+ void operator()(const boost::shared_ptr<Queue>& queue);
};
boost::intrusive_ptr<Message> msg;
@@ -72,7 +72,7 @@ namespace qpid {
virtual Message& getMessage() { return *msg; };
- virtual void deliverTo(Queue::shared_ptr& queue);
+ virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
virtual ~TxPublish(){}
diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp
new file mode 100644
index 0000000000..95c137d922
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverListener.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverListener.h"
+
+namespace qpid {
+namespace client {
+
+static const std::string AMQ_FAILOVER("amq.failover");
+
+FailoverListener::FailoverListener(Connection c)
+ : connection(c), session(c.newSession()), subscriptions(session)
+{
+ std::string qname=AMQ_FAILOVER + "." + session.getId().getName();
+ if (session.exchangeQuery(arg::exchange=AMQ_FAILOVER).getType().empty())
+ return; // Failover exchange not implemented.
+ session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
+ session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
+ subscriptions.subscribe(*this, qname, FlowControl::unlimited());
+ thread = sys::Thread(subscriptions);
+}
+
+FailoverListener::~FailoverListener() {
+ subscriptions.stop();
+ if (thread.id()) thread.join();
+}
+
+void FailoverListener::received(Message& msg) {
+ sys::Mutex::ScopedLock l(lock);
+ knowBrokers.clear();
+ framing::Array urlArray;
+ msg.getHeaders().getArray("amq.failover", urlArray);
+ for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i )
+ knowBrokers.push_back(Url((*i)->get<std::string>()));
+}
+
+std::vector<Url> FailoverListener::getKnownBrokers() const {
+ sys::Mutex::ScopedLock l(lock);
+ return knowBrokers;
+}
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h
new file mode 100644
index 0000000000..2c06947300
--- /dev/null
+++ b/cpp/src/qpid/client/FailoverListener.h
@@ -0,0 +1,58 @@
+#ifndef QPID_CLIENT_FAILOVERLISTENER_H
+#define QPID_CLIENT_FAILOVERLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Thread.h"
+#include <vector>
+
+namespace qpid {
+namespace client {
+
+/**
+ * @internal Listen for failover updates from the amq.failover exchange.
+ */
+class FailoverListener : public MessageListener
+{
+ public:
+ FailoverListener(Connection);
+ ~FailoverListener();
+ std::vector<Url> getKnownBrokers() const;
+ void received(Message& msg);
+
+ private:
+ mutable sys::Mutex lock;
+ Connection connection;
+ Session session;
+ SubscriptionManager subscriptions;
+ sys::Thread thread;
+ std::vector<Url> knowBrokers;
+};
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FAILOVERLISTENER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9c503d6d13..e64692bc91 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -19,6 +19,7 @@
#include "Cluster.h"
#include "Connection.h"
#include "DumpClient.h"
+#include "FailoverExchange.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
@@ -109,6 +110,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
// FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
+ failoverExchange.reset(new FailoverExchange(this));
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
@@ -331,15 +333,15 @@ void Cluster::configChange (
Mutex::ScopedLock l(lock);
QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
- map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
- updateMemberStats(l);
+ bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
if (state == LEFT) return;
if (!map.isAlive(memberId)) { leave(l); return; }
-
+
if(state == INIT) { // First configChange
if (map.aliveCount() == 1) {
QPID_LOG(info, *this << " first in cluster at " << myUrl);
map = ClusterMap(memberId, myUrl, true);
+ memberUpdate(l);
unstall(l);
}
else { // Joining established group.
@@ -348,6 +350,8 @@ void Cluster::configChange (
QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
+ else if (state >= READY && changed)
+ memberUpdate(l);
}
void Cluster::dumpInDone(const ClusterMap& m) {
@@ -403,8 +407,9 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
tryMakeOffer(id, l);
}
-void Cluster::ready(const MemberId& id, const std::string& url, Lock&) {
- map.ready(id, Url(url));
+void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
+ if (map.ready(id, Url(url)))
+ memberUpdate(l);
}
void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
@@ -454,8 +459,8 @@ void Cluster::checkDumpIn(Lock& l) {
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
- unstall(l);
mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
+ unstall(l);
}
}
@@ -488,28 +493,31 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string
Lock l(lock);
QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
- case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+ case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
+ case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
default: return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode() {
+void Cluster::stopClusterNode(Lock&) {
QPID_LOG(notice, *this << " stopped by admin");
leave();
}
-void Cluster::stopFullCluster() {
- Lock l(lock);
+void Cluster::stopFullCluster(Lock& l) {
QPID_LOG(notice, *this << " shutting down cluster " << name.str());
mcastControl(ClusterShutdownBody(), 0, l);
}
-void Cluster::updateMemberStats(Lock& l) {
+void Cluster::memberUpdate(Lock& l) {
+ std::vector<Url> vectUrl = getUrls(l);
+ size_t size = vectUrl.size();
+
+ failoverExchange->setUrls(vectUrl);
+
if (mgmtObject) {
- std::vector<Url> vectUrl = getUrls(l);
- size_t size = vectUrl.size();
+
if (lastSize != size && size == 1){
QPID_LOG(info, *this << " last node standing, updating queue policies.");
broker.getQueues().updateQueueClusterState(true);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index d1cf4b752f..723a23d1bd 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -23,6 +23,7 @@
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
+#include "FailoverExchange.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
@@ -74,6 +75,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// URLs of current cluster members.
std::vector<Url> getUrls() const;
+ boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
// Leave the cluster
void leave();
@@ -93,6 +95,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
+ // NB: The final Lock& parameter on functions below is used to mark functions
+ // that should only be called by a function that already holds the lock.
+ // The parameter makes it hard to forget since you have to have an instance of
+ // a Lock to call the unlocked functions.
+
// Unlocked versions of public functions
void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&);
void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&);
@@ -145,9 +152,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- void stopClusterNode();
- void stopFullCluster();
- void updateMemberStats(Lock&);
+
+ void stopClusterNode(Lock&);
+ void stopFullCluster(Lock&);
+ void memberUpdate(Lock&);
// Called in connection IO threads .
void checkDumpIn(Lock&);
@@ -181,6 +189,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::optional<ClusterMap> dumpedMap;
size_t lastSize;
+ boost::shared_ptr<FailoverExchange> failoverExchange;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index f3b5451afb..e2fc25bfaa 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -72,18 +72,20 @@ ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt)
std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1));
}
-void ClusterMap::configChange(
+bool ClusterMap::configChange(
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
cpg_address */*joined*/, int /*nJoined*/)
{
cpg_address* a;
+ bool memberChange=false;
for (a = left; a != left+nLeft; ++a) {
- members.erase(*a);
+ memberChange = members.erase(*a);
newbies.erase(*a);
}
alive.clear();
std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
+ return memberChange;
}
Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
@@ -133,8 +135,8 @@ bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) {
return false;
}
-void ClusterMap::ready(const MemberId& id, const Url& url) {
- if (isAlive(id)) members[id] = url;
+bool ClusterMap::ready(const MemberId& id, const Url& url) {
+ return isAlive(id) && members.insert(Map::value_type(id,url)).second;
}
boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) {
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 79afba7dc0..c0012facaf 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -50,8 +50,10 @@ class ClusterMap {
ClusterMap(const MemberId& id, const Url& url, bool isReady);
ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
- /** Update from config change. */
- void configChange(
+ /** Update from config change.
+ *@return true if member set changed.
+ */
+ bool configChange(
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined);
@@ -76,7 +78,9 @@ class ClusterMap {
bool dumpRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to);
- void ready(const MemberId& id, const Url&);
+
+ /**@return true If this is a new member */
+ bool ready(const MemberId& id, const Url&);
private:
Url getUrl(const Map& map, const MemberId& id);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 8457467196..14a666a1c6 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -34,6 +34,7 @@ namespace qpid {
namespace cluster {
using namespace std;
+using broker::Broker;
struct ClusterValues {
string name;
@@ -74,12 +75,14 @@ struct ClusterPlugin : public Plugin {
Options* getOptions() { return &options; }
void initialize(Plugin::Target& target) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
+ if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
+ broker->getExchanges().registerExchange(cluster->getFailoverExchange());
}
void earlyInitialize(Plugin::Target&) {}
diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp
new file mode 100644
index 0000000000..abc7f5df6f
--- /dev/null
+++ b/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Array.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+using namespace std;
+
+using namespace broker;
+using namespace framing;
+
+const string FailoverExchange::TYPE_NAME("amq.failover");
+
+FailoverExchange::FailoverExchange(management::Manageable* parent) : Exchange(TYPE_NAME, parent) {
+ if (mgmtExchange != 0)
+ mgmtExchange->set_type(TYPE_NAME);
+}
+
+
+void FailoverExchange::setUrls(const vector<Url>& u) {
+ Lock l(lock);
+ urls=u;
+ if (urls.empty()) return;
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(&FailoverExchange::sendUpdate, this, _1));
+}
+
+string FailoverExchange::getType() const { return TYPE_NAME; }
+
+bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
+ Lock l(lock);
+ sendUpdate(queue);
+ return queues.insert(queue).second;
+}
+
+bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
+ Lock l(lock);
+ return queues.erase(queue);
+}
+
+bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, const framing::FieldTable*) {
+ Lock l(lock);
+ return queues.find(queue) != queues.end();
+}
+
+void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) {
+ QPID_LOG(warning, "Message received by exchange " << TYPE_NAME << " ignoring");
+}
+
+void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
+ // Called with lock held.
+ if (urls.empty()) return;
+ framing::Array array(0x95); // FIXME aconway 2008-10-06: Array is unusable like this. Need type constants or better mapping.
+ for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
+ const ProtocolVersion v;
+ boost::intrusive_ptr<Message> msg(new Message);
+ AMQFrame command(MessageTransferBody(v, TYPE_NAME, 1, 0));
+ command.setLastSegment(false);
+ msg->getFrames().append(command);
+ AMQHeaderBody header;
+ header.get<MessageProperties>(true)->setContentLength(0);
+ header.get<MessageProperties>(true)->getApplicationHeaders().setArray(TYPE_NAME, array);
+ AMQFrame headerFrame(header);
+ headerFrame.setFirstSegment(false);
+ msg->getFrames().append(headerFrame);
+ DeliverableMessage(msg).deliverTo(queue);
+}
+
+}} // namespace cluster
diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h
new file mode 100644
index 0000000000..738cd2a602
--- /dev/null
+++ b/cpp/src/qpid/cluster/FailoverExchange.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_FAILOVEREXCHANGE_H
+#define QPID_CLUSTER_FAILOVEREXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/Url.h"
+
+#include <vector>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Failover exchange provides failover host list, as specified in AMQP 0-10.
+ */
+class FailoverExchange : public broker::Exchange
+{
+ public:
+ static const std::string TYPE_NAME;
+
+ FailoverExchange(management::Manageable* parent);
+
+ void setUrls(const std::vector<Url>&);
+
+ // Exchange overrides
+ std::string getType() const;
+ bool bind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool unbind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool isBound(broker::Queue::shared_ptr queue, const std::string* const routingKey, const framing::FieldTable* const args);
+ void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+
+ private:
+ void sendUpdate(const broker::Queue::shared_ptr&);
+
+ typedef sys::Mutex::ScopedLock Lock;
+ typedef std::vector<Url> Urls;
+ typedef std::set<broker::Queue::shared_ptr> Queues;
+
+ sys::Mutex lock;
+ Urls urls;
+ Queues queues;
+
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_FAILOVEREXCHANGE_H*/
diff --git a/cpp/src/qpid/framing/Array.cpp b/cpp/src/qpid/framing/Array.cpp
index bcee85f472..42d05f71c9 100644
--- a/cpp/src/qpid/framing/Array.cpp
+++ b/cpp/src/qpid/framing/Array.cpp
@@ -117,7 +117,7 @@ bool Array::operator==(const Array& x) const {
void Array::add(ValuePtr value)
{
if (typeOctet != value->getType()) {
- throw IllegalArgumentException(QPID_MSG("Wrong type of value, expected " << typeOctet));
+ throw IllegalArgumentException(QPID_MSG("Wrong type of value in Array, expected " << typeOctet << " but found " << value->getType()));
}
values.push_back(value);
}
diff --git a/cpp/src/qpid/framing/Array.h b/cpp/src/qpid/framing/Array.h
index b18b86fa35..d3ca04dd1d 100644
--- a/cpp/src/qpid/framing/Array.h
+++ b/cpp/src/qpid/framing/Array.h
@@ -61,13 +61,13 @@ class Array
}
}
+ ValueVector::const_iterator begin() const { return values.begin(); }
+ ValueVector::const_iterator end() const { return values.end(); }
+
private:
uint8_t typeOctet;
ValueVector values;
- ValueVector::const_iterator begin() const { return values.begin(); }
- ValueVector::const_iterator end() const { return values.end(); }
-
friend std::ostream& operator<<(std::ostream& out, const Array& body);
};
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
index 5326ab7c14..9846f1d42b 100644
--- a/cpp/src/qpid/framing/FrameSet.cpp
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -33,7 +33,7 @@ FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalcula
void FrameSet::append(const AMQFrame& part)
{
parts.push_back(part);
- recalculateSize = true;
+ recalculateSize = true;
}
bool FrameSet::isComplete() const