diff options
author | Alan Conway <aconway@apache.org> | 2009-07-16 16:28:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-07-16 16:28:14 +0000 |
commit | 2263213d7dfa3aaba38360144f7b098fd0a96bee (patch) | |
tree | c5ae73ed7c26362b5eb9f1a5298bab74501056f9 /cpp/src | |
parent | b70e5a051527440f6a764d08a96adf908c5a8af0 (diff) | |
download | qpid-python-2263213d7dfa3aaba38360144f7b098fd0a96bee.tar.gz |
Update queue listeners in the correct order.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@794736 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/QueueListeners.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 34 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 39 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Numbering.h | 70 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateReceiver.h | 42 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 5 |
9 files changed, 208 insertions, 48 deletions
diff --git a/cpp/src/qpid/broker/QueueListeners.h b/cpp/src/qpid/broker/QueueListeners.h index 9bb847ff94..51ef58eb06 100644 --- a/cpp/src/qpid/broker/QueueListeners.h +++ b/cpp/src/qpid/broker/QueueListeners.h @@ -57,6 +57,11 @@ class QueueListeners void populate(NotificationSet&); bool contains(Consumer::shared_ptr c) const; + template <class F> void eachListener(F f) { + std::for_each(browsers.begin(), browsers.end(), f); + std::for_each(consumers.begin(), consumers.end(), f); + } + private: Listeners consumers; Listeners browsers; diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index e83cf6a1e2..87280f682b 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -19,21 +19,22 @@ * */ -#include "qpid/cluster/ClusterMap.h" -#include "qpid/cluster/ClusterSettings.h" -#include "qpid/cluster/Cpg.h" -#include "qpid/cluster/Decoder.h" -#include "qpid/cluster/ErrorCheck.h" -#include "qpid/cluster/Event.h" -#include "qpid/cluster/EventFrame.h" -#include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/FailoverExchange.h" -#include "qpid/cluster/LockedConnectionMap.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/cluster/NoOpConnectionOutputHandler.h" -#include "qpid/cluster/PollableQueue.h" -#include "qpid/cluster/PollerDispatch.h" -#include "qpid/cluster/Quorum.h" +#include "ClusterMap.h" +#include "ClusterSettings.h" +#include "Cpg.h" +#include "Decoder.h" +#include "ErrorCheck.h" +#include "Event.h" +#include "EventFrame.h" +#include "ExpiryPolicy.h" +#include "FailoverExchange.h" +#include "LockedConnectionMap.h" +#include "Multicaster.h" +#include "NoOpConnectionOutputHandler.h" +#include "PollableQueue.h" +#include "PollerDispatch.h" +#include "Quorum.h" +#include "UpdateReceiver.h" #include "qmf/org/apache/qpid/cluster/Cluster.h" #include "qpid/Url.h" @@ -114,6 +115,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } + UpdateReceiver& getUpdateReceiver() { return updateReceiver; } + private: typedef sys::Monitor::ScopedLock Lock; @@ -258,6 +261,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::optional<ClusterMap> updatedMap; bool updateRetracted; ErrorCheck error; + UpdateReceiver updateReceiver; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 15cd028e10..ce3f922a02 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -18,9 +18,10 @@ * under the License. * */ -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/cluster/Cluster.h" +#include "Connection.h" +#include "UpdateClient.h" +#include "Cluster.h" +#include "UpdateReceiver.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" @@ -44,9 +45,8 @@ // TODO aconway 2008-11-03: // -// Disproportionate amount of code here is dedicated to receiving an -// update when joining a cluster and building initial -// state. Should be separated out into its own classes. +// Refactor code for receiving an update into a separate UpdateConnection +// class. // @@ -73,7 +73,8 @@ sys::AtomicValue<uint64_t> idCounter; Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) : cluster(c), self(id), catchUp(false), output(*this, out), connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false), - mcastFrameHandler(cluster.getMulticast(), self) + mcastFrameHandler(cluster.getMulticast(), self), + consumerNumbering(c.getUpdateReceiver().consumerNumbering) { init(); } // Local connection @@ -81,7 +82,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId member, bool isCatchUp, bool isLink) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0), - expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) + expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), + consumerNumbering(c.getUpdateReceiver().consumerNumbering) { init(); } void Connection::init() { @@ -251,15 +253,15 @@ broker::SemanticState& Connection::semanticState() { return sessionState().getSemanticState(); } -void Connection::consumerState( - const string& name, bool blocked, bool notifyEnabled, bool isInListener) +void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this()); + consumerNumbering.add(c.shared_from_this()); } + void Connection::sessionState( const SequenceNumber& replayStart, const SequenceNumber& sendCommandPoint, @@ -306,6 +308,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); + consumerNumbering.clear(); self.second = 0; // Mark this as completed update connection. } @@ -378,10 +381,8 @@ void Connection::deliveryRecord(const string& qname, } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - boost::shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); - if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); - q->setPosition(position); - } + findQueue(qname)->setPosition(position); +} void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); @@ -450,5 +451,11 @@ void Connection::connectionError(const std::string& msg) { cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); } -}} // namespace qpid::cluster +void Connection::addQueueListener(const std::string& q, uint32_t listener) { + if (listener >= consumerNumbering.size()) + throw Exception(QPID_MSG("Invalid listener ID: " << listener)); + findQueue(q)->getListeners().addListener(consumerNumbering[listener]); +} + +}} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index ac2b6cd50c..e15c23ccf2 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -22,12 +22,14 @@ * */ -#include "qpid/cluster/types.h" -#include "qpid/cluster/OutputInterceptor.h" -#include "qpid/cluster/EventFrame.h" -#include "qpid/cluster/McastFrameHandler.h" +#include "types.h" +#include "OutputInterceptor.h" +#include "EventFrame.h" +#include "McastFrameHandler.h" +#include "UpdateReceiver.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/ConnectionInputHandler.h" @@ -103,7 +105,7 @@ class Connection : // Called for data delivered from the cluster. void deliveredFrame(const EventFrame&); - void consumerState(const std::string& name, bool blocked, bool notifyEnabled, bool isInListener); + void consumerState(const std::string& name, bool blocked, bool notifyEnabled); // ==== Used in catch-up mode to build initial state. // @@ -113,7 +115,8 @@ class Connection : const framing::SequenceSet& sentIncomplete, const framing::SequenceNumber& expected, const framing::SequenceNumber& received, - const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); + const framing::SequenceSet& unknownCompleted, + const SequenceSet& receivedIncomplete); void outputTask(uint16_t channel, const std::string& name); @@ -143,9 +146,9 @@ class Connection : void txAccept(const framing::SequenceSet&); void txDequeue(const std::string&); void txEnqueue(const std::string&); - void txPublish(const qpid::framing::Array&, bool); + void txPublish(const framing::Array&, bool); void txEnd(); - void accumulatedAck(const qpid::framing::SequenceSet&); + void accumulatedAck(const framing::SequenceSet&); // Encoded queue/exchange replication. void queue(const std::string& encoded); @@ -158,6 +161,8 @@ class Connection : OutputInterceptor& getOutput() { return output; } + void addQueueListener(const std::string& queue, uint32_t listener); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -190,6 +195,7 @@ class Connection : boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; + UpdateReceiver::ConsumerNumbering& consumerNumbering; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/cpp/src/qpid/cluster/Numbering.h b/cpp/src/qpid/cluster/Numbering.h new file mode 100644 index 0000000000..2d2d931384 --- /dev/null +++ b/cpp/src/qpid/cluster/Numbering.h @@ -0,0 +1,70 @@ +#ifndef QPID_CLUSTER_NUMBERING_H +#define QPID_CLUSTER_NUMBERING_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <map> +#include <vector> + +namespace qpid { +namespace cluster { + +/** + * A set of numbered T, with two way mapping number->T T->number + * Used to construct numberings of objects by code sending and receiving updates. + */ +template <class T> class Numbering +{ + public: + size_t size() const { return byNumber.size(); } + + size_t add(const T& t) { + size_t n = (*this)[t]; // Already in the set? + if (n == size()) { + byObject[t] = n; + byNumber.push_back(t); + } + return n; + } + + void clear() { byObject.clear(); byNumber.clear(); } + + /**@return object at index n or T() if n > size() */ + T operator[](size_t n) const { return(n < size()) ? byNumber[n] : T(); } + + /**@return index of t or size() if t is not in the map */ + size_t operator[](const T& t) const { + typename Map::const_iterator i = byObject.find(t); + return (i != byObject.end()) ? i->second : size(); + } + + bool contains(const T& t) const { return (*this)[t] == size(); } + + private: + typedef std::map<T, size_t> Map; + Map byObject; + std::vector<T> byNumber; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_NUMBERING_H*/ diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 1e9af4a589..143db20ac0 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -128,16 +128,17 @@ void UpdateClient::update() { Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); + // Update queue is used to transfer acquired messages that are no // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); session.close(); + // Update queue listeners: must come after sessions so consumerNumbering is populated. + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; @@ -295,11 +296,12 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda } void UpdateClient::updateSession(broker::SessionHandler& sh) { - QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " - << sh.getSession()->getId()); broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. + QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() + << "[" << sh.getChannel() << "] = " << ss->getId()); + // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); @@ -350,6 +352,7 @@ void UpdateClient::updateConsumer( { QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); + using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -367,10 +370,12 @@ void UpdateClient::updateConsumer( ClusterConnectionProxy(shadowSession).consumerState( ci->getName(), ci->isBlocked(), - ci->isNotifyEnabled(), - ci->getQueue()->getListeners().contains(ci) + ci->isNotifyEnabled() ); - QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); + consumerNumbering.add(ci); + + QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() + << " on " << shadowSession.getId()); } void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { @@ -448,4 +453,20 @@ void UpdateClient::updateTxState(broker::SemanticState& s) { } } +void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { + queue->getListeners().eachListener( + boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1)); +} + +void UpdateClient::updateQueueListener(std::string& q, + const boost::shared_ptr<broker::Consumer>& c) +{ + const boost::shared_ptr<SemanticState::ConsumerImpl> ci = + boost::dynamic_pointer_cast<SemanticState::ConsumerImpl>(c); + size_t n = consumerNumbering[ci]; + if (n >= consumerNumbering.size()) + throw Exception(QPID_MSG("Unexpected listener on queue " << q)); + ClusterConnectionProxy(session).addQueueListener(q, n); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 4970026547..29ef5f9df2 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -23,6 +23,7 @@ */ #include "qpid/cluster/ClusterMap.h" +#include "qpid/cluster/Numbering.h" #include "qpid/client/Connection.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/client/AsyncSession.h" @@ -94,7 +95,10 @@ class UpdateClient : public sys::Runnable { void updateTxState(broker::SemanticState& s); void updateOutputTask(const sys::OutputTask* task); void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); + void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); + void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c); + Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering; MemberId updaterId; MemberId updateeId; Url updateeUrl; diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h new file mode 100644 index 0000000000..cc1ce0da8d --- /dev/null +++ b/cpp/src/qpid/cluster/UpdateReceiver.h @@ -0,0 +1,42 @@ +#ifndef QPID_CLUSTER_UPDATESTATE_H +#define QPID_CLUSTER_UPDATESTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Numbering.h" +#include "qpid/broker/SemanticState.h" + +namespace qpid { +namespace cluster { + +/** + * Cluster-wide state used when receiving an update. + */ +class UpdateReceiver { + public: + /** Numbering used to identify Queue listeners as consumers */ + typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl> > ConsumerNumbering; + ConsumerNumbering consumerNumbering; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_UPDATESTATE_H*/ diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 3700ab4b4b..9fda235481 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -132,7 +132,7 @@ libshlibtest_la_SOURCES = shlibtest.cpp tmodule_LTLIBRARIES += test_store.la test_store_la_SOURCES = test_store.cpp -test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required? +test_store_la_LIBADD = $(lib_broker) test_store_la_LDFLAGS = -module include cluster.mk @@ -286,7 +286,8 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow -LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test run_failover_soak stop_broker \ +LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \ + run_failover_soak \ federated_cluster_test_with_node_failure # TODO: renable the temporarily disabled the failing reliable_replication_test when QPID-1984 is resolved. |