diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/management-schema.xml | 59 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 1 |
4 files changed, 52 insertions, 36 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index a0bcb9ae02..70c73191ee 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -78,7 +78,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } - void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); } + void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); } void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); } void shutdown() { cluster.shutdown(member, l); } @@ -110,8 +110,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str()); agent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); - // FIXME aconway 2008-09-24: - // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); failoverExchange.reset(new FailoverExchange(this)); @@ -372,6 +370,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { + setClusterId(true); QPID_LOG(info, *this << " first in cluster at " << myUrl); state = READY; if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); @@ -395,7 +394,7 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { if (state == READY && map.isNewbie(id)) { state = OFFER; QPID_LOG(debug, *this << " send dump-offer to " << id); - mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), l); + mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l); } } @@ -432,7 +431,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { +void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& uuid, Lock& l) { if (state == LEFT) return; MemberId dumpee(dumpeeInt); boost::optional<Url> url = map.dumpOffer(dumper, dumpee); @@ -450,6 +449,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { else if (dumpee == myId && url) { assert(state == NEWBIE); QPID_LOG(debug, *this << " accepted dump-offer from " << dumper); + setClusterId(uuid); state = DUMPEE; deliverQueue.stop(); checkDumpIn(l); @@ -580,4 +580,11 @@ broker::Broker& Cluster::getBroker() const { return broker; // Immutable, no need to lock. } +void Cluster::setClusterId(const Uuid& uuid) { + clusterId = uuid; + if (mgmtObject) + mgmtObject->set_clusterID(clusterId.str()); + QPID_LOG(debug, *this << " cluster-id = " << clusterId); +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 8ee55e68f8..d8b9c958d8 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -42,6 +42,12 @@ #include <map> namespace qpid { + +namespace framing { +class AMQBody; +class Uuid; +} + namespace cluster { class Connection; @@ -118,7 +124,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // May be called in CPG thread via deliver() OR in deliverQueue thread. // void dumpRequest(const MemberId&, const std::string&, Lock&); - void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&); + void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&); void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); @@ -166,6 +172,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void dumpOutError(const std::exception&); void dumpOutDone(Lock&); + void setClusterId(const framing::Uuid&); + mutable sys::Monitor lock; broker::Broker& broker; @@ -181,6 +189,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { PollableEventQueue deliverQueue; PlainEventQueue mcastQueue; uint32_t mcastId; + framing::Uuid clusterId; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle diff --git a/qpid/cpp/src/qpid/cluster/management-schema.xml b/qpid/cpp/src/qpid/cluster/management-schema.xml index 0f30814367..4c25ddf636 100644 --- a/qpid/cpp/src/qpid/cluster/management-schema.xml +++ b/qpid/cpp/src/qpid/cluster/management-schema.xml @@ -1,39 +1,39 @@ <schema package="org.apache.qpid.cluster"> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> + <!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> <!-- Type information: - Numeric types with "_wm" suffix are watermarked numbers. These are compound - values containing a current value, and a low and high water mark for the reporting - interval. The low and high water marks are set to the current value at the - beginning of each interval and track the minimum and maximum values of the statistic - over the interval respectively. +Numeric types with "_wm" suffix are watermarked numbers. These are compound +values containing a current value, and a low and high water mark for the reporting +interval. The low and high water marks are set to the current value at the +beginning of each interval and track the minimum and maximum values of the statistic +over the interval respectively. - Access rights for configuration elements: +Access rights for configuration elements: - RO => Read Only - RC => Read/Create, can be set at create time only, read-only thereafter - RW => Read/Write +RO => Read Only +RC => Read/Create, can be set at create time only, read-only thereafter +RW => Read/Write - If access rights are omitted for a property, they are assumed to be RO. +If access rights are omitted for a property, they are assumed to be RO. --> @@ -44,8 +44,7 @@ <property name="publishedURL" type="sstr" access="RC" desc="URL this node advertizes itself as"/> <property name="clusterSize" type="uint16" access="RO" desc="Number of brokers currently in the cluster"/> <property name="status" type="sstr" access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/> - <property name="members" type="lstr" access="RO" desc="List of 'host:port' of member nodes in the cluster delimited by ';'"/> - + <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/> <method name="stopClusterNode"/> <method name="stopFullCluster"/> diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 6555ea1162..35df70c7ca 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -33,6 +33,7 @@ <control name = "dump-offer" code="0x2" label="Member offering to be dumper for dumpee."> <field name="dumpee" type="uint64"/> + <field name="cluster-id" type="uuid"/> </control> <control name = "dump-start" code="0x3" label="Used internally by dumper to mark stall point."> |