summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/QueueListeners.h5
-rw-r--r--cpp/src/qpid/cluster/Cluster.h34
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp39
-rw-r--r--cpp/src/qpid/cluster/Connection.h22
-rw-r--r--cpp/src/qpid/cluster/Numbering.h70
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp35
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h4
-rw-r--r--cpp/src/qpid/cluster/UpdateReceiver.h42
-rw-r--r--cpp/src/tests/Makefile.am5
-rw-r--r--cpp/xml/cluster.xml7
10 files changed, 213 insertions, 50 deletions
diff --git a/cpp/src/qpid/broker/QueueListeners.h b/cpp/src/qpid/broker/QueueListeners.h
index 9bb847ff94..51ef58eb06 100644
--- a/cpp/src/qpid/broker/QueueListeners.h
+++ b/cpp/src/qpid/broker/QueueListeners.h
@@ -57,6 +57,11 @@ class QueueListeners
void populate(NotificationSet&);
bool contains(Consumer::shared_ptr c) const;
+ template <class F> void eachListener(F f) {
+ std::for_each(browsers.begin(), browsers.end(), f);
+ std::for_each(consumers.begin(), consumers.end(), f);
+ }
+
private:
Listeners consumers;
Listeners browsers;
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index e83cf6a1e2..87280f682b 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -19,21 +19,22 @@
*
*/
-#include "qpid/cluster/ClusterMap.h"
-#include "qpid/cluster/ClusterSettings.h"
-#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/Decoder.h"
-#include "qpid/cluster/ErrorCheck.h"
-#include "qpid/cluster/Event.h"
-#include "qpid/cluster/EventFrame.h"
-#include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/FailoverExchange.h"
-#include "qpid/cluster/LockedConnectionMap.h"
-#include "qpid/cluster/Multicaster.h"
-#include "qpid/cluster/NoOpConnectionOutputHandler.h"
-#include "qpid/cluster/PollableQueue.h"
-#include "qpid/cluster/PollerDispatch.h"
-#include "qpid/cluster/Quorum.h"
+#include "ClusterMap.h"
+#include "ClusterSettings.h"
+#include "Cpg.h"
+#include "Decoder.h"
+#include "ErrorCheck.h"
+#include "Event.h"
+#include "EventFrame.h"
+#include "ExpiryPolicy.h"
+#include "FailoverExchange.h"
+#include "LockedConnectionMap.h"
+#include "Multicaster.h"
+#include "NoOpConnectionOutputHandler.h"
+#include "PollableQueue.h"
+#include "PollerDispatch.h"
+#include "Quorum.h"
+#include "UpdateReceiver.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"
#include "qpid/Url.h"
@@ -114,6 +115,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
+ UpdateReceiver& getUpdateReceiver() { return updateReceiver; }
+
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -258,6 +261,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::optional<ClusterMap> updatedMap;
bool updateRetracted;
ErrorCheck error;
+ UpdateReceiver updateReceiver;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 15cd028e10..ce3f922a02 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -18,9 +18,10 @@
* under the License.
*
*/
-#include "qpid/cluster/Connection.h"
-#include "qpid/cluster/UpdateClient.h"
-#include "qpid/cluster/Cluster.h"
+#include "Connection.h"
+#include "UpdateClient.h"
+#include "Cluster.h"
+#include "UpdateReceiver.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
@@ -44,9 +45,8 @@
// TODO aconway 2008-11-03:
//
-// Disproportionate amount of code here is dedicated to receiving an
-// update when joining a cluster and building initial
-// state. Should be separated out into its own classes.
+// Refactor code for receiving an update into a separate UpdateConnection
+// class.
//
@@ -73,7 +73,8 @@ sys::AtomicValue<uint64_t> idCounter;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
: cluster(c), self(id), catchUp(false), output(*this, out),
connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
- mcastFrameHandler(cluster.getMulticast(), self)
+ mcastFrameHandler(cluster.getMulticast(), self),
+ consumerNumbering(c.getUpdateReceiver().consumerNumbering)
{ init(); }
// Local connection
@@ -81,7 +82,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
: cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
+ expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self),
+ consumerNumbering(c.getUpdateReceiver().consumerNumbering)
{ init(); }
void Connection::init() {
@@ -251,15 +253,15 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
-void Connection::consumerState(
- const string& name, bool blocked, bool notifyEnabled, bool isInListener)
+void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled)
{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this());
+ consumerNumbering.add(c.shared_from_this());
}
+
void Connection::sessionState(
const SequenceNumber& replayStart,
const SequenceNumber& sendCommandPoint,
@@ -306,6 +308,7 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
+ consumerNumbering.clear();
self.second = 0; // Mark this as completed update connection.
}
@@ -378,10 +381,8 @@ void Connection::deliveryRecord(const string& qname,
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- boost::shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
- if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
- q->setPosition(position);
- }
+ findQueue(qname)->setPosition(position);
+}
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
@@ -450,5 +451,11 @@ void Connection::connectionError(const std::string& msg) {
cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
}
-}} // namespace qpid::cluster
+void Connection::addQueueListener(const std::string& q, uint32_t listener) {
+ if (listener >= consumerNumbering.size())
+ throw Exception(QPID_MSG("Invalid listener ID: " << listener));
+ findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
+}
+
+}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index ac2b6cd50c..e15c23ccf2 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -22,12 +22,14 @@
*
*/
-#include "qpid/cluster/types.h"
-#include "qpid/cluster/OutputInterceptor.h"
-#include "qpid/cluster/EventFrame.h"
-#include "qpid/cluster/McastFrameHandler.h"
+#include "types.h"
+#include "OutputInterceptor.h"
+#include "EventFrame.h"
+#include "McastFrameHandler.h"
+#include "UpdateReceiver.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/SemanticState.h"
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionInputHandler.h"
@@ -103,7 +105,7 @@ class Connection :
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
- void consumerState(const std::string& name, bool blocked, bool notifyEnabled, bool isInListener);
+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
// ==== Used in catch-up mode to build initial state.
//
@@ -113,7 +115,8 @@ class Connection :
const framing::SequenceSet& sentIncomplete,
const framing::SequenceNumber& expected,
const framing::SequenceNumber& received,
- const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
+ const framing::SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete);
void outputTask(uint16_t channel, const std::string& name);
@@ -143,9 +146,9 @@ class Connection :
void txAccept(const framing::SequenceSet&);
void txDequeue(const std::string&);
void txEnqueue(const std::string&);
- void txPublish(const qpid::framing::Array&, bool);
+ void txPublish(const framing::Array&, bool);
void txEnd();
- void accumulatedAck(const qpid::framing::SequenceSet&);
+ void accumulatedAck(const framing::SequenceSet&);
// Encoded queue/exchange replication.
void queue(const std::string& encoded);
@@ -158,6 +161,8 @@ class Connection :
OutputInterceptor& getOutput() { return output; }
+ void addQueueListener(const std::string& queue, uint32_t listener);
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -190,6 +195,7 @@ class Connection :
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
+ UpdateReceiver::ConsumerNumbering& consumerNumbering;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/cpp/src/qpid/cluster/Numbering.h b/cpp/src/qpid/cluster/Numbering.h
new file mode 100644
index 0000000000..2d2d931384
--- /dev/null
+++ b/cpp/src/qpid/cluster/Numbering.h
@@ -0,0 +1,70 @@
+#ifndef QPID_CLUSTER_NUMBERING_H
+#define QPID_CLUSTER_NUMBERING_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include <vector>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A set of numbered T, with two way mapping number->T T->number
+ * Used to construct numberings of objects by code sending and receiving updates.
+ */
+template <class T> class Numbering
+{
+ public:
+ size_t size() const { return byNumber.size(); }
+
+ size_t add(const T& t) {
+ size_t n = (*this)[t]; // Already in the set?
+ if (n == size()) {
+ byObject[t] = n;
+ byNumber.push_back(t);
+ }
+ return n;
+ }
+
+ void clear() { byObject.clear(); byNumber.clear(); }
+
+ /**@return object at index n or T() if n > size() */
+ T operator[](size_t n) const { return(n < size()) ? byNumber[n] : T(); }
+
+ /**@return index of t or size() if t is not in the map */
+ size_t operator[](const T& t) const {
+ typename Map::const_iterator i = byObject.find(t);
+ return (i != byObject.end()) ? i->second : size();
+ }
+
+ bool contains(const T& t) const { return (*this)[t] == size(); }
+
+ private:
+ typedef std::map<T, size_t> Map;
+ Map byObject;
+ std::vector<T> byNumber;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_NUMBERING_H*/
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 1e9af4a589..143db20ac0 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -128,16 +128,17 @@ void UpdateClient::update() {
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
+
// Update queue is used to transfer acquired messages that are no
// longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
-
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
-
session.queueDelete(arg::queue=UPDATE);
session.close();
+ // Update queue listeners: must come after sessions so consumerNumbering is populated.
+ b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
@@ -295,11 +296,12 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
}
void UpdateClient::updateSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
- << sh.getSession()->getId());
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
+ QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection()
+ << "[" << sh.getChannel() << "] = " << ss->getId());
+
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
@@ -350,6 +352,7 @@ void UpdateClient::updateConsumer(
{
QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on "
<< shadowSession.getId());
+
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -367,10 +370,12 @@ void UpdateClient::updateConsumer(
ClusterConnectionProxy(shadowSession).consumerState(
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabled(),
- ci->getQueue()->getListeners().contains(ci)
+ ci->isNotifyEnabled()
);
- QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
+ consumerNumbering.add(ci);
+
+ QPID_LOG(debug, updaterId << " updated consumer " << ci->getName()
+ << " on " << shadowSession.getId());
}
void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
@@ -448,4 +453,20 @@ void UpdateClient::updateTxState(broker::SemanticState& s) {
}
}
+void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
+ queue->getListeners().eachListener(
+ boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1));
+}
+
+void UpdateClient::updateQueueListener(std::string& q,
+ const boost::shared_ptr<broker::Consumer>& c)
+{
+ const boost::shared_ptr<SemanticState::ConsumerImpl> ci =
+ boost::dynamic_pointer_cast<SemanticState::ConsumerImpl>(c);
+ size_t n = consumerNumbering[ci];
+ if (n >= consumerNumbering.size())
+ throw Exception(QPID_MSG("Unexpected listener on queue " << q));
+ ClusterConnectionProxy(session).addQueueListener(q, n);
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 4970026547..29ef5f9df2 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -23,6 +23,7 @@
*/
#include "qpid/cluster/ClusterMap.h"
+#include "qpid/cluster/Numbering.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/AsyncSession.h"
@@ -94,7 +95,10 @@ class UpdateClient : public sys::Runnable {
void updateTxState(broker::SemanticState& s);
void updateOutputTask(const sys::OutputTask* task);
void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
+ void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
+ void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c);
+ Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
MemberId updaterId;
MemberId updateeId;
Url updateeUrl;
diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h
new file mode 100644
index 0000000000..cc1ce0da8d
--- /dev/null
+++ b/cpp/src/qpid/cluster/UpdateReceiver.h
@@ -0,0 +1,42 @@
+#ifndef QPID_CLUSTER_UPDATESTATE_H
+#define QPID_CLUSTER_UPDATESTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Numbering.h"
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Cluster-wide state used when receiving an update.
+ */
+class UpdateReceiver {
+ public:
+ /** Numbering used to identify Queue listeners as consumers */
+ typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl> > ConsumerNumbering;
+ ConsumerNumbering consumerNumbering;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_UPDATESTATE_H*/
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 3700ab4b4b..9fda235481 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -132,7 +132,7 @@ libshlibtest_la_SOURCES = shlibtest.cpp
tmodule_LTLIBRARIES += test_store.la
test_store_la_SOURCES = test_store.cpp
-test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required?
+test_store_la_LIBADD = $(lib_broker)
test_store_la_LDFLAGS = -module
include cluster.mk
@@ -286,7 +286,8 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test run_failover_soak stop_broker \
+LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
+ run_failover_soak \
federated_cluster_test_with_node_failure
# TODO: renable the temporarily disabled the failing reliable_replication_test when QPID-1984 is resolved.
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index ab66179a05..92917dcfa6 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -108,8 +108,6 @@
<field name="name" type="str8"/>
<field name="blocked" type="bit"/>
<field name="notifyEnabled" type="bit"/>
- <!-- Flag set if the consumer is in its queue's listener set. -->
- <field name="is-in-listener" type="bit"/>
</control>
<!-- Delivery-record for outgoing messages sent but not yet accepted. -->
@@ -191,5 +189,10 @@
<!-- Set expiry-id for subsequent messages. -->
<control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
+ <!-- Add a listener to a queue -->
+ <control name="add-queue-listener" code="0x34">
+ <field name="queue" type="str8"/>
+ <field name="consumer" type="uint32"/>
+ </control>
</class>
</amqp>