diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-04-23 21:06:35 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-04-23 21:06:35 +0000 |
commit | a7259adba14345898e78b483b7620340ffa5cfc5 (patch) | |
tree | e8d26c0981a666442ad4aa2fff5ddb87c5ce5866 /qpid/cpp/src/qpid/cluster/UpdateClient.cpp | |
parent | 8d32b03448e8e1ba6319fc0ac484d0ab54b29b38 (diff) | |
download | qpid-python-cmake.tar.gz |
Merge in trunk changes from r758432:768028cmake
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/cmake@768053 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 97eae7efa3..bb4df8890a 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -26,6 +26,9 @@ #include "ExpiryPolicy.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/ConnectionImpl.h" +#include "qpid/client/Future.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" @@ -98,10 +101,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con expiry(expiry_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) -{ - connection.open(url, cs); - session = connection.newSession(UPDATE); -} +{} UpdateClient::~UpdateClient() {} @@ -110,6 +110,8 @@ const std::string UpdateClient::UPDATE("qpid.cluster-update"); void UpdateClient::run() { try { + connection.open(updateeUrl, connectionSettings); + session = connection.newSession(UPDATE); update(); done(); } catch (const std::exception& e) { @@ -126,15 +128,19 @@ void UpdateClient::update() { // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - session.close(); std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + session.queueDelete(arg::queue=UPDATE); + session.close(); + + ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); + connection.close(); QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } @@ -203,7 +209,6 @@ class MessageUpdater { sb.get()->send(transfer, message.payload->getFrames()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) |