summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp39
1 files changed, 23 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 15cd028e10..ce3f922a02 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -18,9 +18,10 @@
* under the License.
*
*/
-#include "qpid/cluster/Connection.h"
-#include "qpid/cluster/UpdateClient.h"
-#include "qpid/cluster/Cluster.h"
+#include "Connection.h"
+#include "UpdateClient.h"
+#include "Cluster.h"
+#include "UpdateReceiver.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
@@ -44,9 +45,8 @@
// TODO aconway 2008-11-03:
//
-// Disproportionate amount of code here is dedicated to receiving an
-// update when joining a cluster and building initial
-// state. Should be separated out into its own classes.
+// Refactor code for receiving an update into a separate UpdateConnection
+// class.
//
@@ -73,7 +73,8 @@ sys::AtomicValue<uint64_t> idCounter;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
: cluster(c), self(id), catchUp(false), output(*this, out),
connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
- mcastFrameHandler(cluster.getMulticast(), self)
+ mcastFrameHandler(cluster.getMulticast(), self),
+ consumerNumbering(c.getUpdateReceiver().consumerNumbering)
{ init(); }
// Local connection
@@ -81,7 +82,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
: cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self),
+ consumerNumbering(c.getUpdateReceiver().consumerNumbering)
{ init(); }
void Connection::init() {
@@ -251,15 +253,15 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
-void Connection::consumerState(
- const string& name, bool blocked, bool notifyEnabled, bool isInListener)
+void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled)
{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this());
+ consumerNumbering.add(c.shared_from_this());
}
+
void Connection::sessionState(
const SequenceNumber& replayStart,
const SequenceNumber& sendCommandPoint,
@@ -306,6 +308,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
+ consumerNumbering.clear();
self.second = 0; // Mark this as completed update connection.
}
@@ -378,10 +381,8 @@ void Connection::deliveryRecord(const string& qname,
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- boost::shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
- if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
- q->setPosition(position);
- }
+ findQueue(qname)->setPosition(position);
+}
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
@@ -450,5 +451,11 @@ void Connection::connectionError(const std::string& msg) {
cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
}
-}} // namespace qpid::cluster
+void Connection::addQueueListener(const std::string& q, uint32_t listener) {
+ if (listener >= consumerNumbering.size())
+ throw Exception(QPID_MSG("Invalid listener ID: " << listener));
+ findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
+}
+
+}} // Namespace qpid::cluster