diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 39 |
1 files changed, 23 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 15cd028e10..ce3f922a02 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -18,9 +18,10 @@ * under the License. * */ -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/cluster/Cluster.h" +#include "Connection.h" +#include "UpdateClient.h" +#include "Cluster.h" +#include "UpdateReceiver.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" @@ -44,9 +45,8 @@ // TODO aconway 2008-11-03: // -// Disproportionate amount of code here is dedicated to receiving an -// update when joining a cluster and building initial -// state. Should be separated out into its own classes. +// Refactor code for receiving an update into a separate UpdateConnection +// class. // @@ -73,7 +73,8 @@ sys::AtomicValue<uint64_t> idCounter; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) : cluster(c), self(id), catchUp(false), output(*this, out), connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false), - mcastFrameHandler(cluster.getMulticast(), self) + mcastFrameHandler(cluster.getMulticast(), self), + consumerNumbering(c.getUpdateReceiver().consumerNumbering) { init(); } // Local connection @@ -81,7 +82,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId member, bool isCatchUp, bool isLink) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) + expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), + consumerNumbering(c.getUpdateReceiver().consumerNumbering) { init(); } void Connection::init() { @@ -251,15 +253,15 @@ broker::SemanticState& Connection::semanticState() { return sessionState().getSemanticState(); } -void Connection::consumerState( - const string& name, bool blocked, bool notifyEnabled, bool isInListener) +void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this()); + consumerNumbering.add(c.shared_from_this()); } + void Connection::sessionState( const SequenceNumber& replayStart, const SequenceNumber& sendCommandPoint, @@ -306,6 +308,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); + consumerNumbering.clear(); self.second = 0; // Mark this as completed update connection. } @@ -378,10 +381,8 @@ void Connection::deliveryRecord(const string& qname, } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - boost::shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); - if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); - q->setPosition(position); - } + findQueue(qname)->setPosition(position); +} void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); @@ -450,5 +451,11 @@ void Connection::connectionError(const std::string& msg) { cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); } -}} // namespace qpid::cluster +void Connection::addQueueListener(const std::string& q, uint32_t listener) { + if (listener >= consumerNumbering.size()) + throw Exception(QPID_MSG("Invalid listener ID: " << listener)); + findQueue(q)->getListeners().addListener(consumerNumbering[listener]); +} + +}} // Namespace qpid::cluster |