summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp17
1 files changed, 11 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 97eae7efa3..bb4df8890a 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -26,6 +26,9 @@
#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
@@ -98,10 +101,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
-{
- connection.open(url, cs);
- session = connection.newSession(UPDATE);
-}
+{}
UpdateClient::~UpdateClient() {}
@@ -110,6 +110,8 @@ const std::string UpdateClient::UPDATE("qpid.cluster-update");
void UpdateClient::run() {
try {
+ connection.open(updateeUrl, connectionSettings);
+ session = connection.newSession(UPDATE);
update();
done();
} catch (const std::exception& e) {
@@ -126,15 +128,19 @@ void UpdateClient::update() {
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
- session.close();
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ session.queueDelete(arg::queue=UPDATE);
+ session.close();
+
+
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
+
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
@@ -203,7 +209,6 @@ class MessageUpdater {
sb.get()->send(transfer, message.payload->getFrames());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
-
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)