summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp35
1 files changed, 28 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 1e9af4a589..143db20ac0 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -128,16 +128,17 @@ void UpdateClient::update() {
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
+
// Update queue is used to transfer acquired messages that are no
// longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
-
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
-
session.queueDelete(arg::queue=UPDATE);
session.close();
+ // Update queue listeners: must come after sessions so consumerNumbering is populated.
+ b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
@@ -295,11 +296,12 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
- << sh.getSession()->getId());
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
+ QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection()
+ << "[" << sh.getChannel() << "] = " << ss->getId());
+
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
@@ -350,6 +352,7 @@ void UpdateClient::updateConsumer(
{
QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
+
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -367,10 +370,12 @@ void UpdateClient::updateConsumer(
ClusterConnectionProxy(shadowSession).consumerState(
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabled(),
- ci->getQueue()->getListeners().contains(ci)
+ ci->isNotifyEnabled()
);
- QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
+ consumerNumbering.add(ci);
+
+ QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+ << " on " << shadowSession.getId());
}
void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
@@ -448,4 +453,20 @@ void UpdateClient::updateTxState(broker::SemanticState& s) {
}
}
+void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
+ queue->getListeners().eachListener(
+ boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1));
+}
+
+void UpdateClient::updateQueueListener(std::string& q,
+ const boost::shared_ptr<broker::Consumer>& c)
+{
+ const boost::shared_ptr<SemanticState::ConsumerImpl> ci =
+ boost::dynamic_pointer_cast<SemanticState::ConsumerImpl>(c);
+ size_t n = consumerNumbering[ci];
+ if (n >= consumerNumbering.size())
+ throw Exception(QPID_MSG("Unexpected listener on queue " << q));
+ ClusterConnectionProxy(session).addQueueListener(q, n);
+}
+
}} // namespace qpid::cluster