diff options
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 40 |
1 files changed, 34 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index bc1b812a94..7d73f3c1db 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -18,12 +18,14 @@ * under the License. * */ +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/cluster/UpdateClient.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterMap.h" #include "qpid/cluster/Connection.h" #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" +#include "qpid/cluster/UpdateDataExchange.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/client/SessionImpl.h" @@ -52,6 +54,7 @@ #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" +#include "qpid/types/Variant.h" #include "qpid/Url.h" #include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/bind.hpp> @@ -62,12 +65,14 @@ namespace qpid { namespace cluster { +using amqp_0_10::ListCodec; using broker::Broker; using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; using broker::SemanticState; +using types::Variant; using namespace framing; namespace arg=client::arg; @@ -153,7 +158,6 @@ void UpdateClient::update() { std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); - session.close(); // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); @@ -162,14 +166,16 @@ void UpdateClient::update() { updateManagementAgent(); + session.close(); + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false); client::ConnectionAccess::getImpl(connection)->handle(frame); - // FIXME aconway 2010-06-16: Connection will be closed from the other end. - // connection.close(); + // NOTE: connection will be closed from the other end, don't close + // it here as that causes a race. // FIXME aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. @@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent() { management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - // Send management schemas and agents. string data; + + QPID_LOG(debug, updaterId << " updating management schemas. ") agent->exportSchemas(data); - ClusterConnectionProxy(session).managementSchema(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management agents. ") agent->exportAgents(data); - ClusterConnectionProxy(session).managementAgents(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management deleted objects. ") + typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; + DeletedObjectList deleted; + agent->exportDeletedObjects(deleted); + Variant::List list; + for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) { + string encoded; + (*i)->encode(encoded); + list.push_back(encoded); + } + ListCodec::encode(list, data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { |