summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp5
-rw-r--r--cpp/src/qpid/broker/SemanticState.h8
-rw-r--r--cpp/src/qpid/broker/SessionState.h1
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp8
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp10
-rw-r--r--cpp/src/tests/cluster_test.cpp19
-rw-r--r--cpp/xml/cluster.xml18
8 files changed, 62 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 177157bbb6..4d735c9abc 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -616,6 +616,11 @@ void SemanticState::ConsumerImpl::disableNotify()
notifyEnabled = false;
}
+bool SemanticState::ConsumerImpl::isNotifyEnabld() {
+ Mutex::ScopedLock l(lock);
+ return notifyEnabled;
+}
+
void SemanticState::ConsumerImpl::notify()
{
//TODO: alter this, don't want to hold locks across external
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 7b72921210..c2d8cc7d0b 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -100,6 +100,7 @@ class SemanticState : public sys::OutputTask,
void disableNotify();
void enableNotify();
void notify();
+ bool isNotifyEnabld();
void setWindowMode();
void setCreditMode();
@@ -109,7 +110,8 @@ class SemanticState : public sys::OutputTask,
void stop();
void complete(DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
- bool isBlocked() const { return blocked; }
+ bool isBlocked() const { return blocked; }
+ bool setBlocked(bool set) { std::swap(set, blocked); return set; }
bool hasOutput();
bool doOutput();
@@ -150,7 +152,7 @@ class SemanticState : public sys::OutputTask,
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void record(const DeliveryRecord& delivery);
void checkDtxTimeout();
- ConsumerImpl& find(const std::string& destination);
+
void complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
@@ -162,6 +164,8 @@ class SemanticState : public sys::OutputTask,
~SemanticState();
SessionContext& getSession() { return session; }
+
+ ConsumerImpl& find(const std::string& destination);
/**
* Get named queue, never returns 0.
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 66eba9d2a9..ba750f0cc6 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -101,6 +101,7 @@ class SessionState : public qpid::SessionState,
void readyToSend();
template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
+ SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); }
private:
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 2f1518f871..28391a5c78 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -22,6 +22,7 @@
#include "Cluster.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/SemanticState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
@@ -159,6 +160,13 @@ void Connection::deliverBuffer(Buffer& buf) {
delivered(mcastDecoder.frame);
}
+void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
+ broker::SessionHandler& h = connection.getChannel(currentChannel);
+ broker::SessionState* s = h.getSession();
+ broker::SemanticState::ConsumerImpl& c = s->getConsumer(name);
+ c.setBlocked(blocked);
+ if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+}
void Connection::sessionState(
const SequenceNumber& replayStart,
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index b537470b41..3b5298a8a1 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -89,6 +89,8 @@ class Connection :
void deliverBuffer(framing::Buffer&);
void delivered(framing::AMQFrame&);
+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
+
// ==== Used in catch-up mode to build initial state.
//
// State dump methods.
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 58aa14655c..c262115f9f 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -35,6 +35,8 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
+#include "qpid/framing/ClusterConnectionSessionStateBody.h"
+#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
@@ -227,7 +229,13 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
- // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled?
+ ClusterConnectionConsumerStateBody state(
+ ProtocolVersion(),
+ ci->getName(),
+ ci->isBlocked(),
+ ci->isNotifyEnabld()
+ );
+ client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
}
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 68920d1324..99ca5c7161 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -239,8 +239,11 @@ QPID_AUTO_TEST_CASE(DumpConsumers) {
cluster.add();
Client c1(cluster[1], "c1");
+ c1.session.queueDeclare("p");
c1.session.queueDeclare("q");
c1.subs.subscribe(c1.lq, "q", FlowControl::zero());
+ LocalQueue lp;
+ c1.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
c1.session.sync();
// Start new members
@@ -249,22 +252,34 @@ QPID_AUTO_TEST_CASE(DumpConsumers) {
cluster.add();
Client c2(cluster[2], "c2");
- // Transfer a message, verify all members see it.
+ // Transfer messages
c1.session.messageTransfer(arg::content=Message("aaa", "q"));
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
+ c1.session.messageTransfer(arg::content=Message("bbb", "p"));
+ c1.session.messageTransfer(arg::content=Message("ccc", "p"));
+
// Activate the subscription, ensure message removed on all queues.
c1.subs.setFlowControl("q", FlowControl::unlimited());
Message m;
BOOST_CHECK(c1.lq.get(m, TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "aaa");
-
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+ // Check second subscription's flow control: getsnn first message, not second.
+ BOOST_CHECK(lp.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "ccc");
+
// Kill the subscribing member, ensure further messages are not removed.
cluster.killWithSilencer(1,c1.connection,9);
cluster.waitFor(2);
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 35df70c7ca..1eb33e8333 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -75,9 +75,17 @@
- send shadow-ready to mark end of shadow dump.
- send dump-complete when entire dump is complete.
-->
- <control name="session-state" code="0x4" label="Set session state during a brain dump.">
- <!-- Target session deduced from channel number. -->
+
+ <!-- Consumer state that cannot be set by standard AMQP controls. -->
+ <control name="consumer-state" code="0x10">
+ <field name="name" type="str8"/>
+ <field name="blocked" type="bit"/>
+ <field name="notifyEnabled" type="bit"/>
+ </control>
+ <!-- Complete a session state dump. -->
+ <control name="session-state" code="0x11" label="Set session state during a brain dump.">
+ <!-- Target session deduced from channel number. -->
<field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.-->
<field name="command-point" type="sequence-no"/> <!-- Id of next command sent -->
<field name="sent-incomplete" type="sequence-set"/> <!-- Commands sent and incomplete. -->
@@ -88,12 +96,14 @@
<field name="received-incomplete" type="sequence-set"/> <!-- Received and incomplete -->
</control>
- <control name="shadow-ready" code="0x5" label="End of shadow connection dump.">
+ <!-- Complete a shadow connection dump. -->
+ <control name="shadow-ready" code="0x12" label="End of shadow connection dump.">
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
</control>
- <control name="membership" code="0x6" label="Cluster membership details.">
+ <!-- Complete a cluster state dump. -->
+ <control name="membership" code="0x13" label="Cluster membership details.">
<field name="newbies" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
</control>