diff options
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 35 |
1 files changed, 28 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 1e9af4a589..143db20ac0 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -128,16 +128,17 @@ void UpdateClient::update() { Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); + // Update queue is used to transfer acquired messages that are no // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); session.close(); + // Update queue listeners: must come after sessions so consumerNumbering is populated. + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; @@ -295,11 +296,12 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda } void UpdateClient::updateSession(broker::SessionHandler& sh) { - QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " - << sh.getSession()->getId()); broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. + QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() + << "[" << sh.getChannel() << "] = " << ss->getId()); + // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); @@ -350,6 +352,7 @@ void UpdateClient::updateConsumer( { QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); + using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -367,10 +370,12 @@ void UpdateClient::updateConsumer( ClusterConnectionProxy(shadowSession).consumerState( ci->getName(), ci->isBlocked(), - ci->isNotifyEnabled(), - ci->getQueue()->getListeners().contains(ci) + ci->isNotifyEnabled() ); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); + consumerNumbering.add(ci); + + QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + << " on " << shadowSession.getId()); } void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { @@ -448,4 +453,20 @@ void UpdateClient::updateTxState(broker::SemanticState& s) { } } +void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { + queue->getListeners().eachListener( + boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1)); +} + +void UpdateClient::updateQueueListener(std::string& q, + const boost::shared_ptr<broker::Consumer>& c) +{ + const boost::shared_ptr<SemanticState::ConsumerImpl> ci = + boost::dynamic_pointer_cast<SemanticState::ConsumerImpl>(c); + size_t n = consumerNumbering[ci]; + if (n >= consumerNumbering.size()) + throw Exception(QPID_MSG("Unexpected listener on queue " << q)); + ClusterConnectionProxy(session).addQueueListener(q, n); +} + }} // namespace qpid::cluster |