summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp40
1 files changed, 34 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index bc1b812a94..7d73f3c1db 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -18,12 +18,14 @@
* under the License.
*
*/
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/cluster/UpdateClient.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterMap.h"
#include "qpid/cluster/Connection.h"
#include "qpid/cluster/Decoder.h"
#include "qpid/cluster/ExpiryPolicy.h"
+#include "qpid/cluster/UpdateDataExchange.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/client/SessionImpl.h"
@@ -52,6 +54,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/TypeCode.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Variant.h"
#include "qpid/Url.h"
#include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
#include <boost/bind.hpp>
@@ -62,12 +65,14 @@
namespace qpid {
namespace cluster {
+using amqp_0_10::ListCodec;
using broker::Broker;
using broker::Exchange;
using broker::Queue;
using broker::QueueBinding;
using broker::Message;
using broker::SemanticState;
+using types::Variant;
using namespace framing;
namespace arg=client::arg;
@@ -153,7 +158,6 @@ void UpdateClient::update() {
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
- session.close();
// Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
@@ -162,14 +166,16 @@ void UpdateClient::update() {
updateManagementAgent();
+ session.close();
+
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false);
client::ConnectionAccess::getImpl(connection)->handle(frame);
- // FIXME aconway 2010-06-16: Connection will be closed from the other end.
- // connection.close();
+ // NOTE: connection will be closed from the other end, don't close
+ // it here as that causes a race.
// FIXME aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
@@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent()
{
management::ManagementAgent* agent = updaterBroker.getManagementAgent();
if (!agent) return;
- // Send management schemas and agents.
string data;
+
+ QPID_LOG(debug, updaterId << " updating management schemas. ")
agent->exportSchemas(data);
- ClusterConnectionProxy(session).managementSchema(data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+ QPID_LOG(debug, updaterId << " updating management agents. ")
agent->exportAgents(data);
- ClusterConnectionProxy(session).managementAgents(data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+ QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+ typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
+ DeletedObjectList deleted;
+ agent->exportDeletedObjects(deleted);
+ Variant::List list;
+ for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) {
+ string encoded;
+ (*i)->encode(encoded);
+ list.push_back(encoded);
+ }
+ ListCodec::encode(list, data);
+ session.messageTransfer(
+ arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY),
+ arg::destination=UpdateDataExchange::EXCHANGE_NAME);
}
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {