diff options
author | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
commit | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (patch) | |
tree | de5e5b5e431bf695b2c44e198ee93d179201a0e2 /cpp/src/qpid | |
parent | a653ebe5bdfad1d44a576d2ab23f7e6ea80ba96f (diff) | |
download | qpid-python-41d33af55b9fbf4c664ccb56accb1a37bd1ef006.tar.gz |
broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients.
client: added FailoverListener - receive cluster updates from failover exchange.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/Deliverable.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliverableMessage.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliverableMessage.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxPublish.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.cpp | 59 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.h | 58 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.cpp | 99 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.h | 68 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Array.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/Array.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 2 |
20 files changed, 362 insertions, 43 deletions
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h index c40780c4ae..e0ceeb2408 100644 --- a/cpp/src/qpid/broker/Deliverable.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -33,7 +33,7 @@ namespace qpid { virtual Message& getMessage() = 0; - virtual void deliverTo(Queue::shared_ptr& queue) = 0; + virtual void deliverTo(const boost::shared_ptr<Queue>& queue) = 0; virtual uint64_t contentSize() { return 0; } virtual ~Deliverable(){} }; diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index fd15acf464..5fff54b329 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -22,11 +22,11 @@ using namespace qpid::broker; -DeliverableMessage::DeliverableMessage(boost::intrusive_ptr<Message>& _msg) : msg(_msg) +DeliverableMessage::DeliverableMessage(const boost::intrusive_ptr<Message>& _msg) : msg(_msg) { } -void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) +void DeliverableMessage::deliverTo(const boost::shared_ptr<Queue>& queue) { queue->deliver(msg); delivered = true; diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h index 18e1ec5e29..f5db473c22 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.h +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -32,8 +32,8 @@ namespace qpid { class DeliverableMessage : public Deliverable{ boost::intrusive_ptr<Message> msg; public: - DeliverableMessage(boost::intrusive_ptr<Message>& msg); - virtual void deliverTo(Queue::shared_ptr& queue); + DeliverableMessage(const boost::intrusive_ptr<Message>& msg); + virtual void deliverTo(const boost::shared_ptr<Queue>& queue); Message& getMessage(); uint64_t contentSize(); virtual ~DeliverableMessage(){} diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 45eb308680..309e88e8be 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -97,6 +97,10 @@ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ return i->second; } +bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) { + return exchanges.insert(ExchangeMap::value_type(ex->getName(), ex)).second; +} + void ExchangeRegistry::registerType(const std::string& type, FactoryFunction f) { factory[type] = f; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index 58cbca3d92..787b7896f0 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -59,6 +59,11 @@ class ExchangeRegistry{ */ void setParent (management::Manageable* _parent) { parent = _parent; } + /** Register an exchange instance. + *@return true if registered, false if exchange with same name is already registered. + */ + bool registerExchange(const Exchange::shared_ptr&); + void registerType(const std::string& type, FactoryFunction); /** Call f for each exchange in the registry. */ diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index b058978ccf..4103795087 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -59,7 +59,7 @@ class RecoverableQueueImpl : public RecoverableQueue { Queue::shared_ptr queue; public: - RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {} + RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {} ~RecoverableQueueImpl() {}; void setPersistenceId(uint64_t id); uint64_t getPersistenceId() const; diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index dcee00e803..25ac691ada 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -45,7 +45,7 @@ void TxPublish::commit() throw(){ void TxPublish::rollback() throw(){ } -void TxPublish::deliverTo(Queue::shared_ptr& queue){ +void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ if (!queue->isLocal(msg)) { queues.push_back(queue); delivered = true; @@ -57,7 +57,7 @@ void TxPublish::deliverTo(Queue::shared_ptr& queue){ TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) : ctxt(_ctxt), msg(_msg){} -void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ +void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){ if (!queue->enqueue(ctxt, msg)){ /** * if not store then mark message for ack and deleivery once @@ -70,7 +70,7 @@ void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){} -void TxPublish::Commit::operator()(Queue::shared_ptr& queue){ +void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){ queue->process(msg); } diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index d2590debfb..018437f1ed 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -51,14 +51,14 @@ namespace qpid { boost::intrusive_ptr<Message>& msg; public: Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg); - void operator()(Queue::shared_ptr& queue); + void operator()(const boost::shared_ptr<Queue>& queue); }; class Commit{ boost::intrusive_ptr<Message>& msg; public: Commit(boost::intrusive_ptr<Message>& msg); - void operator()(Queue::shared_ptr& queue); + void operator()(const boost::shared_ptr<Queue>& queue); }; boost::intrusive_ptr<Message> msg; @@ -72,7 +72,7 @@ namespace qpid { virtual Message& getMessage() { return *msg; }; - virtual void deliverTo(Queue::shared_ptr& queue); + virtual void deliverTo(const boost::shared_ptr<Queue>& queue); virtual ~TxPublish(){} diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp new file mode 100644 index 0000000000..95c137d922 --- /dev/null +++ b/cpp/src/qpid/client/FailoverListener.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "FailoverListener.h" + +namespace qpid { +namespace client { + +static const std::string AMQ_FAILOVER("amq.failover"); + +FailoverListener::FailoverListener(Connection c) + : connection(c), session(c.newSession()), subscriptions(session) +{ + std::string qname=AMQ_FAILOVER + "." + session.getId().getName(); + if (session.exchangeQuery(arg::exchange=AMQ_FAILOVER).getType().empty()) + return; // Failover exchange not implemented. + session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true); + session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER); + subscriptions.subscribe(*this, qname, FlowControl::unlimited()); + thread = sys::Thread(subscriptions); +} + +FailoverListener::~FailoverListener() { + subscriptions.stop(); + if (thread.id()) thread.join(); +} + +void FailoverListener::received(Message& msg) { + sys::Mutex::ScopedLock l(lock); + knowBrokers.clear(); + framing::Array urlArray; + msg.getHeaders().getArray("amq.failover", urlArray); + for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) + knowBrokers.push_back(Url((*i)->get<std::string>())); +} + +std::vector<Url> FailoverListener::getKnownBrokers() const { + sys::Mutex::ScopedLock l(lock); + return knowBrokers; +} + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h new file mode 100644 index 0000000000..2c06947300 --- /dev/null +++ b/cpp/src/qpid/client/FailoverListener.h @@ -0,0 +1,58 @@ +#ifndef QPID_CLIENT_FAILOVERLISTENER_H +#define QPID_CLIENT_FAILOVERLISTENER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/Url.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Thread.h" +#include <vector> + +namespace qpid { +namespace client { + +/** + * @internal Listen for failover updates from the amq.failover exchange. + */ +class FailoverListener : public MessageListener +{ + public: + FailoverListener(Connection); + ~FailoverListener(); + std::vector<Url> getKnownBrokers() const; + void received(Message& msg); + + private: + mutable sys::Mutex lock; + Connection connection; + Session session; + SubscriptionManager subscriptions; + sys::Thread thread; + std::vector<Url> knowBrokers; +}; +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_FAILOVERLISTENER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9c503d6d13..e64692bc91 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,6 +19,7 @@ #include "Cluster.h" #include "Connection.h" #include "DumpClient.h" +#include "FailoverExchange.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -109,6 +110,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // FIXME aconway 2008-09-24: // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } + failoverExchange.reset(new FailoverExchange(this)); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); @@ -331,15 +333,15 @@ void Cluster::configChange ( Mutex::ScopedLock l(lock); QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - map.configChange(current, nCurrent, left, nLeft, joined, nJoined); - updateMemberStats(l); + bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined); if (state == LEFT) return; if (!map.isAlive(memberId)) { leave(l); return; } - + if(state == INIT) { // First configChange if (map.aliveCount() == 1) { QPID_LOG(info, *this << " first in cluster at " << myUrl); map = ClusterMap(memberId, myUrl, true); + memberUpdate(l); unstall(l); } else { // Joining established group. @@ -348,6 +350,8 @@ void Cluster::configChange ( QPID_LOG(debug, *this << " send dump-request " << myUrl); } } + else if (state >= READY && changed) + memberUpdate(l); } void Cluster::dumpInDone(const ClusterMap& m) { @@ -403,8 +407,9 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { tryMakeOffer(id, l); } -void Cluster::ready(const MemberId& id, const std::string& url, Lock&) { - map.ready(id, Url(url)); +void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { + if (map.ready(id, Url(url))) + memberUpdate(l); } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { @@ -454,8 +459,8 @@ void Cluster::checkDumpIn(Lock& l) { if (state == DUMPEE && dumpedMap) { map = *dumpedMap; QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); - unstall(l); mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); + unstall(l); } } @@ -488,28 +493,31 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string Lock l(lock); QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break; - case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break; + case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; + case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; default: return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; } -void Cluster::stopClusterNode() { +void Cluster::stopClusterNode(Lock&) { QPID_LOG(notice, *this << " stopped by admin"); leave(); } -void Cluster::stopFullCluster() { - Lock l(lock); +void Cluster::stopFullCluster(Lock& l) { QPID_LOG(notice, *this << " shutting down cluster " << name.str()); mcastControl(ClusterShutdownBody(), 0, l); } -void Cluster::updateMemberStats(Lock& l) { +void Cluster::memberUpdate(Lock& l) { + std::vector<Url> vectUrl = getUrls(l); + size_t size = vectUrl.size(); + + failoverExchange->setUrls(vectUrl); + if (mgmtObject) { - std::vector<Url> vectUrl = getUrls(l); - size_t size = vectUrl.size(); + if (lastSize != size && size == 1){ QPID_LOG(info, *this << " last node standing, updating queue policies."); broker.getQueues().updateQueueClusterState(true); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index d1cf4b752f..723a23d1bd 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -23,6 +23,7 @@ #include "Event.h" #include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" +#include "FailoverExchange.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" @@ -74,6 +75,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // URLs of current cluster members. std::vector<Url> getUrls() const; + boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } // Leave the cluster void leave(); @@ -93,6 +95,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; + // NB: The final Lock& parameter on functions below is used to mark functions + // that should only be called by a function that already holds the lock. + // The parameter makes it hard to forget since you have to have an instance of + // a Lock to call the unlocked functions. + // Unlocked versions of public functions void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&); void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&); @@ -145,9 +152,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - void stopClusterNode(); - void stopFullCluster(); - void updateMemberStats(Lock&); + + void stopClusterNode(Lock&); + void stopFullCluster(Lock&); + void memberUpdate(Lock&); // Called in connection IO threads . void checkDumpIn(Lock&); @@ -181,6 +189,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::optional<ClusterMap> dumpedMap; size_t lastSize; + boost::shared_ptr<FailoverExchange> failoverExchange; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index f3b5451afb..e2fc25bfaa 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -72,18 +72,20 @@ ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1)); } -void ClusterMap::configChange( +bool ClusterMap::configChange( cpg_address *current, int nCurrent, cpg_address *left, int nLeft, cpg_address */*joined*/, int /*nJoined*/) { cpg_address* a; + bool memberChange=false; for (a = left; a != left+nLeft; ++a) { - members.erase(*a); + memberChange = members.erase(*a); newbies.erase(*a); } alive.clear(); std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); + return memberChange; } Url ClusterMap::getUrl(const Map& map, const MemberId& id) { @@ -133,8 +135,8 @@ bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) { return false; } -void ClusterMap::ready(const MemberId& id, const Url& url) { - if (isAlive(id)) members[id] = url; +bool ClusterMap::ready(const MemberId& id, const Url& url) { + return isAlive(id) && members.insert(Map::value_type(id,url)).second; } boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) { diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 79afba7dc0..c0012facaf 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -50,8 +50,10 @@ class ClusterMap { ClusterMap(const MemberId& id, const Url& url, bool isReady); ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states); - /** Update from config change. */ - void configChange( + /** Update from config change. + *@return true if member set changed. + */ + bool configChange( cpg_address *current, int nCurrent, cpg_address *left, int nLeft, cpg_address *joined, int nJoined); @@ -76,7 +78,9 @@ class ClusterMap { bool dumpRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to); - void ready(const MemberId& id, const Url&); + + /**@return true If this is a new member */ + bool ready(const MemberId& id, const Url&); private: Url getUrl(const Map& map, const MemberId& id); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 8457467196..14a666a1c6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -34,6 +34,7 @@ namespace qpid { namespace cluster { using namespace std; +using broker::Broker; struct ClusterValues { string name; @@ -74,12 +75,14 @@ struct ClusterPlugin : public Plugin { Options* getOptions() { return &options; } void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. + if (values.name.empty()) return; // Only if --cluster-name option was specified. + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); + broker->getExchanges().registerExchange(cluster->getFailoverExchange()); } void earlyInitialize(Plugin::Target&) {} diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp new file mode 100644 index 0000000000..abc7f5df6f --- /dev/null +++ b/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "FailoverExchange.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/Array.h" +#include <boost/bind.hpp> +#include <algorithm> + +namespace qpid { +namespace cluster { +using namespace std; + +using namespace broker; +using namespace framing; + +const string FailoverExchange::TYPE_NAME("amq.failover"); + +FailoverExchange::FailoverExchange(management::Manageable* parent) : Exchange(TYPE_NAME, parent) { + if (mgmtExchange != 0) + mgmtExchange->set_type(TYPE_NAME); +} + + +void FailoverExchange::setUrls(const vector<Url>& u) { + Lock l(lock); + urls=u; + if (urls.empty()) return; + std::for_each(queues.begin(), queues.end(), + boost::bind(&FailoverExchange::sendUpdate, this, _1)); +} + +string FailoverExchange::getType() const { return TYPE_NAME; } + +bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { + Lock l(lock); + sendUpdate(queue); + return queues.insert(queue).second; +} + +bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { + Lock l(lock); + return queues.erase(queue); +} + +bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, const framing::FieldTable*) { + Lock l(lock); + return queues.find(queue) != queues.end(); +} + +void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) { + QPID_LOG(warning, "Message received by exchange " << TYPE_NAME << " ignoring"); +} + +void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { + // Called with lock held. + if (urls.empty()) return; + framing::Array array(0x95); // FIXME aconway 2008-10-06: Array is unusable like this. Need type constants or better mapping. + for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) + array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); + const ProtocolVersion v; + boost::intrusive_ptr<Message> msg(new Message); + AMQFrame command(MessageTransferBody(v, TYPE_NAME, 1, 0)); + command.setLastSegment(false); + msg->getFrames().append(command); + AMQHeaderBody header; + header.get<MessageProperties>(true)->setContentLength(0); + header.get<MessageProperties>(true)->getApplicationHeaders().setArray(TYPE_NAME, array); + AMQFrame headerFrame(header); + headerFrame.setFirstSegment(false); + msg->getFrames().append(headerFrame); + DeliverableMessage(msg).deliverTo(queue); +} + +}} // namespace cluster diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h new file mode 100644 index 0000000000..738cd2a602 --- /dev/null +++ b/cpp/src/qpid/cluster/FailoverExchange.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLUSTER_FAILOVEREXCHANGE_H +#define QPID_CLUSTER_FAILOVEREXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Exchange.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/Url.h" + +#include <vector> +#include <set> + +namespace qpid { +namespace cluster { + +/** + * Failover exchange provides failover host list, as specified in AMQP 0-10. + */ +class FailoverExchange : public broker::Exchange +{ + public: + static const std::string TYPE_NAME; + + FailoverExchange(management::Manageable* parent); + + void setUrls(const std::vector<Url>&); + + // Exchange overrides + std::string getType() const; + bool bind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args); + bool unbind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args); + bool isBound(broker::Queue::shared_ptr queue, const std::string* const routingKey, const framing::FieldTable* const args); + void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + + private: + void sendUpdate(const broker::Queue::shared_ptr&); + + typedef sys::Mutex::ScopedLock Lock; + typedef std::vector<Url> Urls; + typedef std::set<broker::Queue::shared_ptr> Queues; + + sys::Mutex lock; + Urls urls; + Queues queues; + +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_FAILOVEREXCHANGE_H*/ diff --git a/cpp/src/qpid/framing/Array.cpp b/cpp/src/qpid/framing/Array.cpp index bcee85f472..42d05f71c9 100644 --- a/cpp/src/qpid/framing/Array.cpp +++ b/cpp/src/qpid/framing/Array.cpp @@ -117,7 +117,7 @@ bool Array::operator==(const Array& x) const { void Array::add(ValuePtr value) { if (typeOctet != value->getType()) { - throw IllegalArgumentException(QPID_MSG("Wrong type of value, expected " << typeOctet)); + throw IllegalArgumentException(QPID_MSG("Wrong type of value in Array, expected " << typeOctet << " but found " << value->getType())); } values.push_back(value); } diff --git a/cpp/src/qpid/framing/Array.h b/cpp/src/qpid/framing/Array.h index b18b86fa35..d3ca04dd1d 100644 --- a/cpp/src/qpid/framing/Array.h +++ b/cpp/src/qpid/framing/Array.h @@ -61,13 +61,13 @@ class Array } } + ValueVector::const_iterator begin() const { return values.begin(); } + ValueVector::const_iterator end() const { return values.end(); } + private: uint8_t typeOctet; ValueVector values; - ValueVector::const_iterator begin() const { return values.begin(); } - ValueVector::const_iterator end() const { return values.end(); } - friend std::ostream& operator<<(std::ostream& out, const Array& body); }; diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index 5326ab7c14..9846f1d42b 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -33,7 +33,7 @@ FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalcula void FrameSet::append(const AMQFrame& part) { parts.push_back(part); - recalculateSize = true; + recalculateSize = true; } bool FrameSet::isComplete() const |